This is an automated email from the ASF dual-hosted git repository. nightowl888 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/lucenenet.git
commit 3ba63c9d737a31204d214916184de109c4507234 Author: Shad Storhaug <[email protected]> AuthorDate: Thu Jul 11 14:44:35 2019 +0700 BUG: Fixed ConcurrentMergeScheduler and TaskMergeScheduler so they don't throw exceptions on background threads and properly throw exceptions on the calling thread (fixes #214, fixes #220, fixes LUCENENET-603) --- .../Index/TestConcurrentMergeScheduler.cs | 52 ++++++++++++++++++ .../Index/TestTaskMergeSchedulerExternal.cs | 54 ++++++++++++++++--- src/Lucene.Net/Index/TaskMergeScheduler.cs | 35 +++++++++---- src/Lucene.Net/Support/Threading/ThreadClass.cs | 61 ++++++++++++++++++++-- 4 files changed, 180 insertions(+), 22 deletions(-) diff --git a/src/Lucene.Net.Tests/Index/TestConcurrentMergeScheduler.cs b/src/Lucene.Net.Tests/Index/TestConcurrentMergeScheduler.cs index 7275e62..28d13e1 100644 --- a/src/Lucene.Net.Tests/Index/TestConcurrentMergeScheduler.cs +++ b/src/Lucene.Net.Tests/Index/TestConcurrentMergeScheduler.cs @@ -40,6 +40,7 @@ namespace Lucene.Net.Index using TestUtil = Lucene.Net.Util.TestUtil; using TextField = TextField; using Attributes; + using Lucene.Net.Util; [TestFixture] public class TestConcurrentMergeScheduler : LuceneTestCase @@ -430,6 +431,57 @@ namespace Lucene.Net.Index w.Dispose(); d.Dispose(); } + + // LUCENENET specific + private class FailOnlyOnMerge : MockDirectoryWrapper.Failure + { + public override void Eval(MockDirectoryWrapper dir) + { + // LUCENENET specific: for these to work in release mode, we have added [MethodImpl(MethodImplOptions.NoInlining)] + // to each possible target of the StackTraceHelper. If these change, so must the attribute on the target methods. + if (StackTraceHelper.DoesStackTraceContainMethod("DoMerge")) + { + throw new IOException("now failing during merge"); + } + } + } + + // LUCENENET-603 + [Test, LuceneNetSpecific] + public void TestExceptionOnBackgroundThreadIsPropagatedToCallingThread() + { + using (MockDirectoryWrapper dir = NewMockDirectory()) + { + dir.FailOn(new FailOnlyOnMerge()); + + Document doc = new Document(); + Field idField = NewStringField("id", "", Field.Store.YES); + doc.Add(idField); + + var mergeScheduler = new ConcurrentMergeScheduler(); + using (IndexWriter writer = new IndexWriter(dir, NewIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(Random())).SetMergeScheduler(mergeScheduler).SetMaxBufferedDocs(2).SetRAMBufferSizeMB(IndexWriterConfig.DISABLE_AUTO_FLUSH).SetMergePolicy(NewLogMergePolicy()))) + { + LogMergePolicy logMP = (LogMergePolicy)writer.Config.MergePolicy; + logMP.MergeFactor = 10; + for (int i = 0; i < 20; i++) + { + writer.AddDocument(doc); + } + + bool exceptionHit = false; + try + { + mergeScheduler.Sync(); + } + catch (MergePolicy.MergeException) + { + exceptionHit = true; + } + + assertTrue(exceptionHit); + } + } + } } } #endif \ No newline at end of file diff --git a/src/Lucene.Net.Tests/Index/TestTaskMergeSchedulerExternal.cs b/src/Lucene.Net.Tests/Index/TestTaskMergeSchedulerExternal.cs index bce54ef..e0289db 100644 --- a/src/Lucene.Net.Tests/Index/TestTaskMergeSchedulerExternal.cs +++ b/src/Lucene.Net.Tests/Index/TestTaskMergeSchedulerExternal.cs @@ -1,4 +1,5 @@ -using Lucene.Net.Documents; +using Lucene.Net.Attributes; +using Lucene.Net.Documents; using Lucene.Net.Index; using Lucene.Net.Support; using Lucene.Net.Util; @@ -46,26 +47,26 @@ namespace Lucene.Net.Tests /// </summary> public class TestTaskMergeSchedulerExternal : LuceneTestCase { - internal volatile bool MergeCalled; - internal volatile bool ExcCalled; + internal volatile bool mergeCalled; + internal volatile bool excCalled; private class MyMergeScheduler : TaskMergeScheduler { - private readonly TestTaskMergeSchedulerExternal OuterInstance; + private readonly TestTaskMergeSchedulerExternal outerInstance; public MyMergeScheduler(TestTaskMergeSchedulerExternal outerInstance) { - this.OuterInstance = outerInstance; + this.outerInstance = outerInstance; } protected override void HandleMergeException(Exception t) { - OuterInstance.ExcCalled = true; + outerInstance.excCalled = true; } public override void Merge(IndexWriter writer, MergeTrigger trigger, bool newMergesFound) { - OuterInstance.MergeCalled = true; + outerInstance.mergeCalled = true; base.Merge(writer, trigger, newMergesFound); } } @@ -104,7 +105,7 @@ namespace Lucene.Net.Tests ((MyMergeScheduler)writer.Config.MergeScheduler).Sync(); writer.Dispose(); - Assert.IsTrue(MergeCalled); + assertTrue(mergeCalled); dir.Dispose(); } @@ -146,5 +147,42 @@ namespace Lucene.Net.Tests writer.Dispose(); dir.Dispose(); } + + // LUCENENET-603 + [Test, LuceneNetSpecific] + public void TestExceptionOnBackgroundThreadIsPropagatedToCallingThread() + { + using (MockDirectoryWrapper dir = NewMockDirectory()) + { + dir.FailOn(new FailOnlyOnMerge()); + + Document doc = new Document(); + Field idField = NewStringField("id", "", Field.Store.YES); + doc.Add(idField); + + var mergeScheduler = new TaskMergeScheduler(); + using (IndexWriter writer = new IndexWriter(dir, NewIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(Random())).SetMergeScheduler(mergeScheduler).SetMaxBufferedDocs(2).SetRAMBufferSizeMB(IndexWriterConfig.DISABLE_AUTO_FLUSH).SetMergePolicy(NewLogMergePolicy()))) + { + LogMergePolicy logMP = (LogMergePolicy)writer.Config.MergePolicy; + logMP.MergeFactor = 10; + for (int i = 0; i < 20; i++) + { + writer.AddDocument(doc); + } + + bool exceptionHit = false; + try + { + mergeScheduler.Sync(); + } + catch (MergePolicy.MergeException) + { + exceptionHit = true; + } + + assertTrue(exceptionHit); + } + } + } } } \ No newline at end of file diff --git a/src/Lucene.Net/Index/TaskMergeScheduler.cs b/src/Lucene.Net/Index/TaskMergeScheduler.cs index e875650..c5e8bea 100644 --- a/src/Lucene.Net/Index/TaskMergeScheduler.cs +++ b/src/Lucene.Net/Index/TaskMergeScheduler.cs @@ -61,7 +61,7 @@ namespace Lucene.Net.Index private Directory _directory; /// <summary> - /// <seea cref="IndexWriter"/> that owns this instance. + /// <see cref="IndexWriter"/> that owns this instance. /// </summary> private IndexWriter _writer; @@ -203,12 +203,16 @@ namespace Lucene.Net.Index } catch (AggregateException ae) { - ae.Handle(x => x is OperationCanceledException); - - foreach (var exception in ae.InnerExceptions) + ae.Handle(ex => { - HandleMergeException(ae); - } + if (!(ex is OperationCanceledException)) + { + HandleMergeException(ex); + return true; + } + + return false; + }); } } } @@ -328,6 +332,14 @@ namespace Lucene.Net.Index } } + /// <summary> + /// Does the actual merge, by calling <see cref="IndexWriter.Merge(MergePolicy.OneMerge)"/> </summary> + [MethodImpl(MethodImplOptions.NoInlining)] + protected virtual void DoMerge(MergePolicy.OneMerge merge) + { + _writer.Merge(merge); + } + private void OnMergeThreadCompleted(object sender, EventArgs e) { var mergeThread = sender as MergeThread; @@ -352,7 +364,7 @@ namespace Lucene.Net.Index var count = Interlocked.Increment(ref _mergeThreadCount); var name = string.Format("Lucene Merge Task #{0}", count); - return new MergeThread(name, writer, merge, writer.infoStream, Verbose, _manualResetEvent, HandleMergeException); + return new MergeThread(name, writer, merge, writer.infoStream, Verbose, _manualResetEvent, HandleMergeException, DoMerge); } /// <summary> @@ -433,6 +445,7 @@ namespace Lucene.Net.Index private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(); private readonly ManualResetEventSlim _resetEvent; private readonly Action<Exception> _exceptionHandler; + private readonly Action<MergePolicy.OneMerge> _doMerge; private readonly InfoStream _logger; private readonly IndexWriter _writer; private readonly MergePolicy.OneMerge _startingMerge; @@ -447,7 +460,7 @@ namespace Lucene.Net.Index /// Sole constructor. </summary> public MergeThread(string name, IndexWriter writer, MergePolicy.OneMerge startMerge, InfoStream logger, bool isLoggingEnabled, - ManualResetEventSlim resetEvent, Action<Exception> exceptionHandler) + ManualResetEventSlim resetEvent, Action<Exception> exceptionHandler, Action<MergePolicy.OneMerge> doMerge) { Name = name; _cancellationTokenSource = new CancellationTokenSource(); @@ -457,6 +470,7 @@ namespace Lucene.Net.Index _isLoggingEnabled = isLoggingEnabled; _resetEvent = resetEvent; _exceptionHandler = exceptionHandler; + _doMerge = doMerge; } public string Name { get; private set; } @@ -581,7 +595,10 @@ namespace Lucene.Net.Index while (true && !cancellationToken.IsCancellationRequested) { RunningMerge = merge; - _writer.Merge(merge); + // LUCENENET NOTE: We MUST call DoMerge(merge) instead of + // _writer.Merge(merge) because the tests specifically look + // for the method name DoMerge in the stack trace. + _doMerge(merge); // Subsequent times through the loop we do any new // merge that writer says is necessary: diff --git a/src/Lucene.Net/Support/Threading/ThreadClass.cs b/src/Lucene.Net/Support/Threading/ThreadClass.cs index f959a4a..a84de7a 100644 --- a/src/Lucene.Net/Support/Threading/ThreadClass.cs +++ b/src/Lucene.Net/Support/Threading/ThreadClass.cs @@ -19,13 +19,19 @@ * */ +using Lucene.Net.Util; using System; using System.Threading; namespace Lucene.Net.Support.Threading { /// <summary> - /// Support class used to handle threads + /// Support class used to handle threads that is + /// inheritable just like the Thread type in Java. + /// This class also ensures that when an error is thrown + /// on a background thread, it will be properly re-thrown + /// on the calling thread, which is the same behavior + /// as we see in Lucene. /// </summary> public class ThreadClass : IThreadRunnable { @@ -35,11 +41,19 @@ namespace Lucene.Net.Support.Threading private Thread _threadField; /// <summary> + /// The exception (if any) caught on the running thread + /// that will be re-thrown on the calling thread after + /// calling <see cref="Join()"/>, <see cref="Join(long)"/>, + /// or <see cref="Join(long, int)"/>. + /// </summary> + private Exception _exception; + + /// <summary> /// Initializes a new instance of the ThreadClass class /// </summary> public ThreadClass() { - _threadField = new Thread(Run); + _threadField = new Thread(() => SafeRun(Run)); } /// <summary> @@ -48,7 +62,7 @@ namespace Lucene.Net.Support.Threading /// <param name="name">The name of the thread</param> public ThreadClass(string name) { - _threadField = new Thread(Run); + _threadField = new Thread(() => SafeRun(Run)); this.Name = name; } @@ -58,7 +72,7 @@ namespace Lucene.Net.Support.Threading /// <param name="start">A ThreadStart delegate that references the methods to be invoked when this thread begins executing</param> public ThreadClass(ThreadStart start) { - _threadField = new Thread(start); + _threadField = new Thread(() => SafeRun(start)); } /// <summary> @@ -68,11 +82,45 @@ namespace Lucene.Net.Support.Threading /// <param name="name">The name of the thread</param> public ThreadClass(ThreadStart start, string name) { - _threadField = new Thread(start); + _threadField = new Thread(() => SafeRun(start)); this.Name = name; } /// <summary> + /// Safely starts the method passed to <paramref name="start"/> and stores any exception that is + /// thrown. The first exception will stop execution of the method passed to <paramref name="start"/> + /// and it will be re-thrown on the calling thread after it calls <see cref="Join()"/>, + /// <see cref="Join(long)"/>, or <see cref="Join(long, int)"/>. + /// </summary> + /// <param name="start">A <see cref="ThreadStart"/> delegate that references the methods to be invoked when this thread begins executing</param> + protected virtual void SafeRun(ThreadStart start) + { + try + { + start.Invoke(); + } + catch (Exception ex) when (!IsThreadingException(ex)) + { + // LUCENENET NOTE: We are intentionally not using an + // AggregateException type here because we want to make + // sure that the unwrapped exception type is caught by Lucene.Net + // so it can handle the control flow accordingly. + _exception = ex; + } + } + + private bool IsThreadingException(Exception e) + { + return +#if !NETSTANDARD1_6 + e.GetType().Equals(typeof(ThreadInterruptedException)) || + e.GetType().Equals(typeof(ThreadAbortException)); +#else + false; +#endif + } + + /// <summary> /// This method has no functionality unless the method is overridden /// </summary> public virtual void Run() @@ -193,6 +241,7 @@ namespace Lucene.Net.Support.Threading public void Join() { _threadField.Join(); + IOUtils.ReThrowUnchecked(_exception); } /// <summary> @@ -202,6 +251,7 @@ namespace Lucene.Net.Support.Threading public void Join(long milliSeconds) { _threadField.Join(Convert.ToInt32(milliSeconds)); + IOUtils.ReThrowUnchecked(_exception); } /// <summary> @@ -214,6 +264,7 @@ namespace Lucene.Net.Support.Threading int totalTime = Convert.ToInt32(milliSeconds + (nanoSeconds*0.000001)); _threadField.Join(totalTime); + IOUtils.ReThrowUnchecked(_exception); } /// <summary>
