This is an automated email from the ASF dual-hosted git repository.

nightowl888 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucenenet.git

commit 2bd1f2c2ea6853d8d46da3ba9760fb90409edeea
Author: Shad Storhaug <[email protected]>
AuthorDate: Sat Nov 6 16:55:11 2021 +0700

    Lucene.Net.Replicator.ReplicationClient: Refactored to use ThreadJob so 
exceptions are re-thrown on the calling thread. Added missing locks.
---
 src/Lucene.Net.Replicator/LocalReplicator.cs   |  30 +--
 src/Lucene.Net.Replicator/ReplicationClient.cs | 304 +++++++++++--------------
 2 files changed, 151 insertions(+), 183 deletions(-)

diff --git a/src/Lucene.Net.Replicator/LocalReplicator.cs 
b/src/Lucene.Net.Replicator/LocalReplicator.cs
index aee5c18..7405e13 100644
--- a/src/Lucene.Net.Replicator/LocalReplicator.cs
+++ b/src/Lucene.Net.Replicator/LocalReplicator.cs
@@ -123,7 +123,7 @@ namespace Lucene.Net.Replicator
 
         private long expirationThreshold = 
DEFAULT_SESSION_EXPIRATION_THRESHOLD;
 
-        private readonly object padlock = new object();
+        private readonly object syncLock = new object();
 
         private volatile RefCountedRevision currentRevision;
         private volatile bool disposed = false;
@@ -160,7 +160,7 @@ namespace Lucene.Net.Replicator
         /// <exception cref="ObjectDisposedException">This replicator has 
already been disposed.</exception>
         protected void EnsureOpen()
         {
-            UninterruptableMonitor.Enter(padlock);
+            UninterruptableMonitor.Enter(syncLock);
             try
             {
                 if (!disposed)
@@ -170,13 +170,13 @@ namespace Lucene.Net.Replicator
             }
             finally
             {
-                UninterruptableMonitor.Exit(padlock);
+                UninterruptableMonitor.Exit(syncLock);
             }
         }
 
         public virtual SessionToken CheckForUpdate(string currentVersion)
         {
-            UninterruptableMonitor.Enter(padlock);
+            UninterruptableMonitor.Enter(syncLock);
             try
             {
                 EnsureOpen();
@@ -196,7 +196,7 @@ namespace Lucene.Net.Replicator
             }
             finally
             {
-                UninterruptableMonitor.Exit(padlock);
+                UninterruptableMonitor.Exit(syncLock);
             }
         }
 
@@ -205,7 +205,7 @@ namespace Lucene.Net.Replicator
             if (disposed || !disposing)
                 return;
 
-            UninterruptableMonitor.Enter(padlock);
+            UninterruptableMonitor.Enter(syncLock);
             try
             {
                 foreach (ReplicationSession session in sessions.Values)
@@ -214,7 +214,7 @@ namespace Lucene.Net.Replicator
             }
             finally
             {
-                UninterruptableMonitor.Exit(padlock);
+                UninterruptableMonitor.Exit(syncLock);
             }
             disposed = true;
         }
@@ -237,7 +237,7 @@ namespace Lucene.Net.Replicator
             get => expirationThreshold;
             set
             {
-                UninterruptableMonitor.Enter(padlock);
+                UninterruptableMonitor.Enter(syncLock);
                 try
                 {
                     EnsureOpen();
@@ -246,14 +246,14 @@ namespace Lucene.Net.Replicator
                 }
                 finally
                 {
-                    UninterruptableMonitor.Exit(padlock);
+                    UninterruptableMonitor.Exit(syncLock);
                 }
             }
         }
 
         public virtual Stream ObtainFile(string sessionId, string source, 
string fileName)
         {
-            UninterruptableMonitor.Enter(padlock);
+            UninterruptableMonitor.Enter(syncLock);
             try
             {
                 EnsureOpen();
@@ -273,13 +273,13 @@ namespace Lucene.Net.Replicator
             }
             finally
             {
-                UninterruptableMonitor.Exit(padlock);
+                UninterruptableMonitor.Exit(syncLock);
             }
         }
 
         public virtual void Publish(IRevision revision)
         {
-            UninterruptableMonitor.Enter(padlock);
+            UninterruptableMonitor.Enter(syncLock);
             try
             {
                 EnsureOpen();
@@ -310,14 +310,14 @@ namespace Lucene.Net.Replicator
             }
             finally
             {
-                UninterruptableMonitor.Exit(padlock);
+                UninterruptableMonitor.Exit(syncLock);
             }
         }
 
         /// <exception cref="InvalidOperationException"></exception>
         public virtual void Release(string sessionId)
         {
-            UninterruptableMonitor.Enter(padlock);
+            UninterruptableMonitor.Enter(syncLock);
             try
             {
                 EnsureOpen();
@@ -325,7 +325,7 @@ namespace Lucene.Net.Replicator
             }
             finally
             {
-                UninterruptableMonitor.Exit(padlock);
+                UninterruptableMonitor.Exit(syncLock);
             }
         }
     }
diff --git a/src/Lucene.Net.Replicator/ReplicationClient.cs 
b/src/Lucene.Net.Replicator/ReplicationClient.cs
index 7604fa1..4546d28 100644
--- a/src/Lucene.Net.Replicator/ReplicationClient.cs
+++ b/src/Lucene.Net.Replicator/ReplicationClient.cs
@@ -1,15 +1,16 @@
-using Lucene.Net.Store;
+using J2N;
+using J2N.Collections.Generic.Extensions;
+using J2N.Threading;
+using Lucene.Net.Diagnostics;
+using Lucene.Net.Store;
 using Lucene.Net.Support.Threading;
 using Lucene.Net.Util;
 using System;
 using System.Collections.Generic;
-using System.Collections.ObjectModel;
-using System.Diagnostics;
 using System.IO;
 using System.Threading;
-using JCG = J2N.Collections.Generic;
 using Directory = Lucene.Net.Store.Directory;
-using Lucene.Net.Diagnostics;
+using JCG = J2N.Collections.Generic;
 
 namespace Lucene.Net.Replicator
 {
@@ -44,25 +45,15 @@ namespace Lucene.Net.Replicator
     /// </remarks>
     public class ReplicationClient : IDisposable
     {
-        // LUCENENET TODO: Check to ensure ThreadInterruptException is being 
passed from the background worker to the current thread, as it is required by 
IndexWriter
-
-        //Note: LUCENENET specific, .NET does not work with Threads in the 
same way as Java does, so we mimic the same behavior using the ThreadPool 
instead.
-        private class ReplicationThread
+        private class ReplicationThread : ThreadJob
         {
+            private readonly long intervalMillis;
+            private readonly ReentrantLock updateLock;
             private readonly Action doUpdate;
-            private readonly Action<Exception> handleException;
-            private readonly ReentrantLock @lock;
-            private readonly object controlLock = new object();
-
-            private readonly long interval;
-            private readonly AutoResetEvent handle = new AutoResetEvent(false);
+            private readonly Action<Exception> handleUpdateException;
 
-            private AutoResetEvent stopHandle;
-
-            /// <summary>
-            /// Gets or sets the name
-            /// </summary>
-            public string Name { get; private set; }
+            // client uses this to stop us
+            internal readonly CountdownEvent stop = new CountdownEvent(1);
 
             /// <summary>
             /// 
@@ -70,117 +61,59 @@ namespace Lucene.Net.Replicator
             /// <param name="intervalMillis">The interval in 
milliseconds.</param>
             /// <param name="threadName">The thread name.</param>
             /// <param name="doUpdate">A delegate to call to perform the 
update.</param>
-            /// <param name="handleException">A delegate to call to handle an 
exception.</param>
-            /// <param name="lock"></param>
-            public ReplicationThread(long intervalMillis, string threadName, 
Action doUpdate, Action<Exception> handleException, ReentrantLock @lock)
+            /// <param name="handleUpdateException">A delegate to call to 
handle an exception.</param>
+            /// <param name="updateLock"></param>
+            public ReplicationThread(long intervalMillis, string threadName, 
Action doUpdate, Action<Exception> handleUpdateException, ReentrantLock 
updateLock)
+                : base(threadName)
             {
-                this.doUpdate = doUpdate;
-                this.handleException = handleException;
-                this.@lock = @lock;
-                Name = threadName;
-                this.interval = intervalMillis;
+                this.intervalMillis = intervalMillis;
+                this.doUpdate = doUpdate ?? throw new 
ArgumentNullException(nameof(doUpdate));
+                this.handleUpdateException = handleUpdateException ?? throw 
new ArgumentNullException(nameof(handleUpdateException));
+                this.updateLock = updateLock ?? throw new 
ArgumentNullException(nameof(updateLock));
             }
 
-            /// <summary>
-            /// 
-            /// </summary>
-            public bool IsAlive { get; private set; }
-
-            /// <summary>
-            /// 
-            /// </summary>
-            public void Start()
-            {
-                UninterruptableMonitor.Enter(controlLock);
-                try
-                {
-                    if (IsAlive)
-                        return;
-                    IsAlive = true;
-                }
-                finally
-                {
-                    UninterruptableMonitor.Exit(controlLock);
-                }
-                RegisterWait(interval);
-            }
-
-            /// <summary>
-            /// 
-            /// </summary>
-            public void Stop()
+            public override void Run()
             {
-                UninterruptableMonitor.Enter(controlLock);
-                try
+                while (true)
                 {
-                    if (!IsAlive)
-                        return;
-                    IsAlive = false;
-                }
-                finally
-                {
-                    UninterruptableMonitor.Exit(controlLock);
-                }
-                stopHandle = new AutoResetEvent(false);
-
-                //NOTE: Execute any outstanding, this execution will terminate 
almost instantaniously if it's not already running.
-                ExecuteImmediately();
-
-                stopHandle.WaitOne();
-                stopHandle = null;
-            }
-
-            /// <summary>
-            /// Executes the next cycle of work immediately
-            /// </summary>
-            public void ExecuteImmediately()
-            {
-                handle.Set();
-            }
-
-            private void RegisterWait(long timeout)
-            {
-                //NOTE: We don't care about timedout as it can either be 
because we was requested to run immidiately or stop.
-                if (IsAlive)
-                    ThreadPool.RegisterWaitForSingleObject(handle, (state, 
timedout) => Run(), null, timeout, true);
-                else
-                    SignalStop();
-            }
-
-            private void SignalStop()
-            {
-                if (stopHandle != null)
-                    stopHandle.Set();
-            }
-
-            private void Run()
-            {
-                if (!IsAlive)
-                {
-                    SignalStop();
-                    return;
-                }
+                    long time = Time.NanoTime() / 
Time.MillisecondsPerNanosecond;
+                    updateLock.Lock();
+                    try
+                    {
+                        doUpdate();
+                    }
+                    catch (Exception t) when (t.IsThrowable())
+                    {
+                        handleUpdateException(t);
+                    }
+                    finally
+                    {
+                        updateLock.Unlock();
+                    }
+                    time = Time.NanoTime() / Time.MillisecondsPerNanosecond - 
time;
 
-                Stopwatch timer = Stopwatch.StartNew();
-                @lock.Lock();
-                try
-                {
-                    doUpdate();
-                }
-                catch (Exception t) when (t.IsThrowable())
-                {
-                    handleException(t);
-                }
-                finally
-                {
-                    @lock.Unlock();
-
-                    timer.Stop();
-                    long driftAdjusted = Math.Max(interval - 
timer.ElapsedMilliseconds, 0);
-                    if (IsAlive)
-                        RegisterWait(driftAdjusted);
-                    else
-                        SignalStop();
+                    // adjust timeout to compensate the time spent doing the 
replication.
+                    long timeout = intervalMillis - time;
+                    if (timeout > 0)
+                    {
+                        try
+                        {
+                            // this will return immediately if we were ordered 
to stop (count=0)
+                            // or the timeout has elapsed. if it returns true, 
it means count=0,
+                            // so terminate.
+                            if (stop.Wait(TimeSpan.FromMilliseconds(timeout))) 
//  await(timeout, TimeUnit.MILLISECONDS))
+                            {
+                                return;
+                            }
+                        }
+                        catch (Exception e) when (e.IsInterruptedException())
+                        {
+                            // if we were interruted, somebody wants to 
terminate us, so just
+                            // throw the exception further.
+                            Thread.CurrentThread.Interrupt();
+                            throw new Util.ThreadInterruptedException(e);
+                        }
+                    }
                 }
             }
         }
@@ -199,6 +132,7 @@ namespace Lucene.Net.Replicator
 
         private readonly byte[] copyBuffer = new byte[16384];
         private readonly ReentrantLock updateLock = new ReentrantLock();
+        private readonly object syncLock = new object(); // LUCENENET specific 
to avoid lock (this)
 
         private ReplicationThread updateThread;
         private bool disposed = false;
@@ -239,13 +173,13 @@ namespace Lucene.Net.Replicator
                 string version = handler.CurrentVersion;
                 session = replicator.CheckForUpdate(version);
 
-                WriteToInfoStream(string.Format("doUpdate(): 
handlerVersion={0} session={1}", version, session));
+                WriteToInfoStream(string.Format("DoUpdate(): 
handlerVersion={0} session={1}", version, session));
 
                 if (session == null)
                     return;
 
                 IDictionary<string, IList<RevisionFile>> requiredFiles = 
RequiredFiles(session.SourceFiles);
-                WriteToInfoStream(string.Format("doUpdate(): 
handlerVersion={0} session={1}", version, session));
+                WriteToInfoStream(string.Format("DoUpdate(): 
handlerVersion={0} session={1}", version, session));
 
                 foreach (KeyValuePair<string, IList<RevisionFile>> pair in 
requiredFiles)
                 {
@@ -260,7 +194,7 @@ namespace Lucene.Net.Replicator
                         if (disposed)
                         {
                             // if we're closed, abort file copy
-                            WriteToInfoStream("doUpdate(): detected client was 
closed); abort file copy");
+                            WriteToInfoStream("DoUpdate(): detected client was 
closed); abort file copy");
                             return;
                         }
 
@@ -281,9 +215,9 @@ namespace Lucene.Net.Replicator
                             IOUtils.Dispose(input, output);
                         }
                     }
-                    // only notify if all required files were successfully 
obtained.
-                    notify = true;
                 }
+                // only notify if all required files were successfully 
obtained.
+                notify = true;
             }
             finally
             {
@@ -312,7 +246,8 @@ namespace Lucene.Net.Replicator
             {
                 if (notify && !disposed)
                 { // no use to notify if we are closed already
-                    handler.RevisionReady(session.Version, 
session.SourceFiles, new ReadOnlyDictionary<string, 
IList<string>>(copiedFiles), sourceDirectory);
+                    // LUCENENET specific - pass the copiedFiles as read only
+                    handler.RevisionReady(session.Version, 
session.SourceFiles, copiedFiles.AsReadOnly(), sourceDirectory);
                 }
             }
             finally
@@ -398,12 +333,20 @@ namespace Lucene.Net.Replicator
 
         protected virtual void Dispose(bool disposing)
         {
-            if (disposed || !disposing)
-                return;
+            UninterruptableMonitor.Enter(syncLock);
+            try
+            {
+                if (disposed || !disposing)
+                    return;
 
-            StopUpdateThread();
-            infoStream.Dispose(); // LUCENENET specific
-            disposed = true;
+                StopUpdateThread();
+                infoStream.Dispose(); // LUCENENET specific
+                disposed = true;
+            }
+            finally
+            {
+                UninterruptableMonitor.Exit(syncLock);
+            }
         }
 
         public void Dispose()
@@ -415,21 +358,29 @@ namespace Lucene.Net.Replicator
         /// <summary>
         /// Start the update thread with the specified interval in 
milliseconds. For
         /// debugging purposes, you can optionally set the name to set on
-        /// <see cref="ReplicationThread.Name"/>. If you pass <c>null</c>, a 
default name
+        /// <see cref="ThreadJob.Name"/>. If you pass <c>null</c>, a default 
name
         /// will be set.
         /// </summary>
         /// <exception cref="InvalidOperationException"> if the thread has 
already been started </exception>
-        public virtual void StartUpdateThread(long intervalMillis, string 
threadName)
+        public virtual void StartUpdateThread(long intervalInMilliseconds, 
string threadName)
         {
-            EnsureOpen();
-            if (updateThread != null && updateThread.IsAlive)
-                throw IllegalStateException.Create("cannot start an update 
thread when one is running, must first call 'stopUpdateThread()'");
-
-            threadName = threadName == null ? INFO_STREAM_COMPONENT : 
"ReplicationThread-" + threadName;
-            updateThread = new ReplicationThread(intervalMillis, threadName, 
DoUpdate, HandleUpdateException, updateLock);
-            updateThread.Start();
-            // we rely on isAlive to return true in isUpdateThreadAlive, 
assert to be on the safe side
-            if (Debugging.AssertsEnabled) 
Debugging.Assert(updateThread.IsAlive, "updateThread started but not alive?");
+            UninterruptableMonitor.Enter(syncLock);
+            try
+            {
+                EnsureOpen();
+                if (updateThread != null && updateThread.IsAlive)
+                    throw IllegalStateException.Create("cannot start an update 
thread when one is running, must first call 'stopUpdateThread()'");
+
+                threadName = threadName == null ? INFO_STREAM_COMPONENT : 
"ReplicationThread-" + threadName;
+                updateThread = new ReplicationThread(intervalInMilliseconds, 
threadName, DoUpdate, HandleUpdateException, updateLock);
+                updateThread.Start();
+                // we rely on isAlive to return true in isUpdateThreadAlive, 
assert to be on the safe side
+                if (Debugging.AssertsEnabled) 
Debugging.Assert(updateThread.IsAlive, "updateThread started but not alive?");
+            }
+            finally
+            {
+                UninterruptableMonitor.Exit(syncLock);
+            }
         }
 
         /// <summary>
@@ -438,21 +389,31 @@ namespace Lucene.Net.Replicator
         /// </summary>
         public virtual void StopUpdateThread()
         {
-            // this will trigger the thread to terminate if it awaits the lock.
-            // otherwise, if it's in the middle of replication, we wait for it 
to
-            // stop.
-            if (updateThread != null)
+            UninterruptableMonitor.Enter(syncLock);
+            try
             {
-                try
-                {
-                    updateThread.Stop();
-                }
-                catch (Exception ie) when (ie.IsInterruptedException())
+                if (updateThread != null)
                 {
-                    throw new Util.ThreadInterruptedException(ie);
+                    // this will trigger the thread to terminate if it awaits 
the lock.
+                    // otherwise, if it's in the middle of replication, we 
wait for it to
+                    // stop.
+                    updateThread.stop.Signal();
+                    try
+                    {
+                        updateThread.Join();
+                    }
+                    catch (Exception ie) when (ie.IsInterruptedException())
+                    {
+                        Thread.CurrentThread.Interrupt();
+                        throw new Util.ThreadInterruptedException(ie);
+                    }
                 }
+                updateThread = null;
+            }
+            finally
+            {
+                UninterruptableMonitor.Exit(syncLock);
             }
-            updateThread = null;
         }
 
         /// <summary>
@@ -462,11 +423,25 @@ namespace Lucene.Net.Replicator
         /// caused it to terminate (i.e. <see cref="HandleUpdateException"/>
         /// threw the exception further).
         /// </summary>
-        public virtual bool IsUpdateThreadAlive => updateThread != null && 
updateThread.IsAlive;
+        public virtual bool IsUpdateThreadAlive
+        {
+            get
+            {
+                UninterruptableMonitor.Enter(syncLock);
+                try
+                {
+                    return updateThread != null && updateThread.IsAlive;
+                }
+                finally
+                {
+                    UninterruptableMonitor.Exit(syncLock);
+                }
+            }
+        }
 
         public override string ToString()
         {
-            if (updateThread == null)
+            if (updateThread is null)
                 return "ReplicationClient";
             return string.Format("ReplicationClient ({0})", updateThread.Name);
         }
@@ -479,13 +454,6 @@ namespace Lucene.Net.Replicator
         public virtual void UpdateNow() 
         {
             EnsureOpen();
-            if (updateThread != null)
-            {
-                //NOTE: We have a worker running, we use that to perform the 
work instead by requesting it to run
-                //      it's cycle immidiately.
-                updateThread.ExecuteImmediately();
-                return;
-            }
 
             //NOTE: We don't have a worker running, so we just do the work.
             updateLock.Lock();

Reply via email to