http://git-wip-us.apache.org/repos/asf/lucenenet/blob/6da4dd20/src/Lucene.Net.Replicator/Lucene.Net.Replicator.csproj ---------------------------------------------------------------------- diff --git a/src/Lucene.Net.Replicator/Lucene.Net.Replicator.csproj b/src/Lucene.Net.Replicator/Lucene.Net.Replicator.csproj new file mode 100644 index 0000000..9481bd4 --- /dev/null +++ b/src/Lucene.Net.Replicator/Lucene.Net.Replicator.csproj @@ -0,0 +1,108 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" /> + <PropertyGroup> + <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration> + <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform> + <ProjectGuid>{1F70D2DB-C1B3-4F78-9598-3E04E0C7EB06}</ProjectGuid> + <OutputType>Library</OutputType> + <AppDesignerFolder>Properties</AppDesignerFolder> + <RootNamespace>Lucene.Net.Replicator</RootNamespace> + <AssemblyName>Lucene.Net.Replicator</AssemblyName> + <TargetFrameworkVersion>v4.5.1</TargetFrameworkVersion> + <FileAlignment>512</FileAlignment> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' "> + <DebugSymbols>true</DebugSymbols> + <DebugType>full</DebugType> + <Optimize>false</Optimize> + <OutputPath>bin\Debug\</OutputPath> + <DefineConstants>DEBUG;TRACE</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' "> + <DebugType>pdbonly</DebugType> + <Optimize>true</Optimize> + <OutputPath>bin\Release\</OutputPath> + <DefineConstants>TRACE</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + </PropertyGroup> + <ItemGroup> + <Reference Include="Newtonsoft.Json, Version=9.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL"> + <HintPath>..\..\packages\Newtonsoft.Json.9.0.1\lib\net45\Newtonsoft.Json.dll</HintPath> + <Private>True</Private> + </Reference> + <Reference Include="System" /> + <Reference Include="System.Core" /> + <Reference Include="System.Xml.Linq" /> + <Reference Include="System.Data.DataSetExtensions" /> + <Reference Include="Microsoft.CSharp" /> + <Reference Include="System.Data" /> + <Reference Include="System.Net.Http" /> + <Reference Include="System.Xml" /> + </ItemGroup> + <ItemGroup> + <Compile Include="ComponentWrapperInfoStream.cs" /> + <Compile Include="Http\EnumerableExtensions.cs" /> + <Compile Include="Http\HttpClientBase.cs" /> + <Compile Include="Http\HttpReplicator.cs" /> + <Compile Include="Http\Abstractions\IReplicationRequest.cs" /> + <Compile Include="Http\Abstractions\IReplicationResponse.cs" /> + <Compile Include="Http\ReplicationService.cs" /> + <Compile Include="IndexAndTaxonomyReplicationHandler.cs" /> + <Compile Include="IndexAndTaxonomyRevision.cs" /> + <Compile Include="IndexInputInputStream.cs" /> + <Compile Include="IndexReplicationHandler.cs" /> + <Compile Include="IndexRevision.cs" /> + <Compile Include="IReplicationHandler.cs" /> + <Compile Include="ISourceDirectoryFactory.cs" /> + <Compile Include="LocalReplicator.cs" /> + <Compile Include="PerSessionDirectoryFactory.cs" /> + <Compile Include="Properties\AssemblyInfo.cs" /> + <Compile Include="ReplicationClient.cs" /> + <Compile Include="Replicator.cs" /> + <Compile Include="Revision.cs" /> + <Compile Include="RevisionFile.cs" /> + <Compile Include="SessionExpiredException.cs" /> + <Compile Include="SessionToken.cs" /> + </ItemGroup> + <ItemGroup> + <Content Include="Http\package.html" /> + </ItemGroup> + <ItemGroup> + <ProjectReference Include="..\Lucene.Net.Facet\Lucene.Net.Facet.csproj"> + <Project>{48f7884a-9454-4e88-8413-9d35992cb440}</Project> + <Name>Lucene.Net.Facet</Name> + </ProjectReference> + <ProjectReference Include="..\Lucene.Net\Lucene.Net.csproj"> + <Project>{5d4ad9be-1ffb-41ab-9943-25737971bf57}</Project> + <Name>Lucene.Net</Name> + </ProjectReference> + </ItemGroup> + <ItemGroup> + <None Include="packages.config" /> + </ItemGroup> + <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> +</Project> \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/lucenenet/blob/6da4dd20/src/Lucene.Net.Replicator/PerSessionDirectoryFactory.cs ---------------------------------------------------------------------- diff --git a/src/Lucene.Net.Replicator/PerSessionDirectoryFactory.cs b/src/Lucene.Net.Replicator/PerSessionDirectoryFactory.cs new file mode 100644 index 0000000..e7f1d80 --- /dev/null +++ b/src/Lucene.Net.Replicator/PerSessionDirectoryFactory.cs @@ -0,0 +1,96 @@ +//STATUS: DRAFT - 4.8.0 + +using System; +using System.IO; +using Lucene.Net.Store; +using Directory = Lucene.Net.Store.Directory; + +namespace Lucene.Net.Replicator +{ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + /// <summary> + /// A <see cref="ISourceDirectoryFactory"/> which returns <see cref="FSDirectory"/> under a + /// dedicated session directory. When a session is over, the entire directory is + /// deleted. + /// </summary> + /// <remarks> + /// Lucene.Experimental + /// </remarks> + public class PerSessionDirectoryFactory : ISourceDirectoryFactory + { + #region Java + //JAVA: private final File workDir; + #endregion + private readonly string workingDirectory; + + /** Constructor with the given sources mapping. */ + public PerSessionDirectoryFactory(string workingDirectory) + { + this.workingDirectory = workingDirectory; + } + + public Directory GetDirectory(string sessionId, string source) + { + #region Java + //JAVA: public Directory getDirectory(String sessionID, String source) throws IOException { + //JAVA: File sessionDir = new File(workDir, sessionID); + //JAVA: if (!sessionDir.exists() && !sessionDir.mkdirs()) { + //JAVA: throw new IOException("failed to create session directory " + sessionDir); + //JAVA: } + //JAVA: File sourceDir = new File(sessionDir, source); + //JAVA: if (!sourceDir.mkdirs()) { + //JAVA: throw new IOException("failed to create source directory " + sourceDir); + //JAVA: } + //JAVA: return FSDirectory.open(sourceDir); + //JAVA: } + #endregion + + string sourceDirectory = Path.Combine(workingDirectory, sessionId, source); + System.IO.Directory.CreateDirectory(sourceDirectory); + return FSDirectory.Open(sourceDirectory); + } + + public void CleanupSession(string sessionId) + { + if (string.IsNullOrEmpty(sessionId)) throw new ArgumentException("sessionID cannot be empty", "sessionId"); + + #region Java + //JAVA: rm(new File(workDir, sessionID)); + #endregion + + string sessionDirectory = Path.Combine(workingDirectory, sessionId); + System.IO.Directory.Delete(sessionDirectory, true); + } + + #region Java + //JAVA: private void rm(File file) throws IOException { + //JAVA: if (file.isDirectory()) { + //JAVA: for (File f : file.listFiles()) { + //JAVA: rm(f); + //JAVA: } + //JAVA: } + //JAVA: + //JAVA: // This should be either an empty directory, or a file + //JAVA: if (!file.delete() && file.exists()) { + //JAVA: throw new IOException("failed to delete " + file); + //JAVA: } + //JAVA: } + #endregion + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/lucenenet/blob/6da4dd20/src/Lucene.Net.Replicator/Properties/AssemblyInfo.cs ---------------------------------------------------------------------- diff --git a/src/Lucene.Net.Replicator/Properties/AssemblyInfo.cs b/src/Lucene.Net.Replicator/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..898ca18 --- /dev/null +++ b/src/Lucene.Net.Replicator/Properties/AssemblyInfo.cs @@ -0,0 +1,24 @@ +using System; +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Lucene.Net.Replicator")] +[assembly: AssemblyDescription("Replicator that allows replication of Lucene.Net files between a server and client(s) " + + "for the Lucene.Net full - text search engine library from The Apache Software Foundation.")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyDefaultAlias("Lucene.Net.Replicator")] +[assembly: AssemblyCulture("")] + +[assembly: CLSCompliant(true)] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("1f70d2db-c1b3-4f78-9598-3e04e0c7eb06")] http://git-wip-us.apache.org/repos/asf/lucenenet/blob/6da4dd20/src/Lucene.Net.Replicator/ReplicationClient.cs ---------------------------------------------------------------------- diff --git a/src/Lucene.Net.Replicator/ReplicationClient.cs b/src/Lucene.Net.Replicator/ReplicationClient.cs new file mode 100644 index 0000000..63837c9 --- /dev/null +++ b/src/Lucene.Net.Replicator/ReplicationClient.cs @@ -0,0 +1,673 @@ +//STATUS: DRAFT - 4.8.0 +using System; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading; +using Lucene.Net.Store; +using Lucene.Net.Support; +using Lucene.Net.Support.Threading; +using Lucene.Net.Util; +using Directory = Lucene.Net.Store.Directory; + +namespace Lucene.Net.Replicator +{ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + /// <summary> + /// A client which monitors and obtains new revisions from a <see cref="IReplicator"/>. + /// It can be used to either periodically check for updates by invoking + /// <see cref="StartUpdateThread"/>, or manually by calling <see cref="UpdateNow"/>. + /// <para> + /// Whenever a new revision is available, the <see cref="RequiredFiles"/> are + /// copied to the <see cref="Directory"/> specified by <see cref="PerSessionDirectoryFactory"/> and + /// a handler is notified. + /// </para> + /// </summary> + /// <remarks> + /// Lucene.Experimental + /// </remarks> + public partial class ReplicationClient : IDisposable + { + /// <summary> + /// The component name to use with <see cref="Util.InfoStream.IsEnabled"/> + /// </summary> + public const string INFO_STREAM_COMPONENT = "ReplicationThread"; + + /// <summary> Gets or sets the <see cref="Util.InfoStream"/> to use for logging messages. </summary> + public InfoStream InfoStream + { + get { return infoStream; } + set { infoStream = value ?? InfoStream.NO_OUTPUT; } + } + + private readonly IReplicator replicator; + private readonly IReplicationHandler handler; + private readonly ISourceDirectoryFactory factory; + + private readonly byte[] copyBuffer = new byte[16384]; + private readonly ReentrantLock updateLock = new ReentrantLock(); + + private ReplicationThread updateThread; + private bool disposed = false; + private InfoStream infoStream = InfoStream.Default; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="replicator">The <see cref="IReplicator"/> used for checking for updates</param> + /// <param name="handler">The <see cref="IReplicationHandler"/> notified when new revisions are ready</param> + /// <param name="factory">The <see cref="ISourceDirectoryFactory"/> for returning a <see cref="Directory"/> for a given source and session</param> + public ReplicationClient(IReplicator replicator, IReplicationHandler handler, ISourceDirectoryFactory factory) + { + this.replicator = replicator; + this.handler = handler; + this.factory = factory; + } + + /// <exception cref="IOException"></exception> + private void DoUpdate() + { + #region Java + //JAVA: private void doUpdate() throws IOException { + //JAVA: SessionToken session = null; + //JAVA: final Map<String,Directory> sourceDirectory = new HashMap<>(); + //JAVA: final Map<String,List<String>> copiedFiles = new HashMap<>(); + //JAVA: boolean notify = false; + //JAVA: try { + //JAVA: final String version = handler.currentVersion(); + //JAVA: session = replicator.checkForUpdate(version); + //JAVA: if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) { + //JAVA: infoStream.message(INFO_STREAM_COMPONENT, "doUpdate(): handlerVersion=" + version + " session=" + session); + //JAVA: } + //JAVA: if (session == null) { + //JAVA: // already up to date + //JAVA: return; + //JAVA: } + //JAVA: Map<String,List<RevisionFile>> requiredFiles = requiredFiles(session.sourceFiles); + //JAVA: if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) { + //JAVA: infoStream.message(INFO_STREAM_COMPONENT, "doUpdate(): requiredFiles=" + requiredFiles); + //JAVA: } + //JAVA: for (Entry<String,List<RevisionFile>> e : requiredFiles.entrySet()) { + //JAVA: String source = e.getKey(); + //JAVA: Directory dir = factory.getDirectory(session.id, source); + //JAVA: sourceDirectory.put(source, dir); + //JAVA: List<String> cpFiles = new ArrayList<>(); + //JAVA: copiedFiles.put(source, cpFiles); + //JAVA: for (RevisionFile file : e.getValue()) { + //JAVA: if (closed) { + //JAVA: // if we're closed, abort file copy + //JAVA: if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) { + //JAVA: infoStream.message(INFO_STREAM_COMPONENT, "doUpdate(): detected client was closed); abort file copy"); + //JAVA: } + //JAVA: return; + //JAVA: } + //JAVA: InputStream in = null; + //JAVA: IndexOutput out = null; + //JAVA: try { + //JAVA: in = replicator.obtainFile(session.id, source, file.fileName); + //JAVA: out = dir.createOutput(file.fileName, IOContext.DEFAULT); + //JAVA: copyBytes(out, in); + //JAVA: cpFiles.add(file.fileName); + //JAVA: // TODO add some validation, on size / checksum + //JAVA: } finally { + //JAVA: IOUtils.close(in, out); + //JAVA: } + //JAVA: } + //JAVA: } + //JAVA: // only notify if all required files were successfully obtained. + //JAVA: notify = true; + //JAVA: } finally { + //JAVA: if (session != null) { + //JAVA: try { + //JAVA: replicator.release(session.id); + //JAVA: } finally { + //JAVA: if (!notify) { // cleanup after ourselves + //JAVA: IOUtils.close(sourceDirectory.values()); + //JAVA: factory.cleanupSession(session.id); + //JAVA: } + //JAVA: } + //JAVA: } + //JAVA: } + //JAVA: + //JAVA: // notify outside the try-finally above, so the session is released sooner. + //JAVA: // the handler may take time to finish acting on the copied files, but the + //JAVA: // session itself is no longer needed. + //JAVA: try { + //JAVA: if (notify && !closed ) { // no use to notify if we are closed already + //JAVA: handler.revisionReady(session.version, session.sourceFiles, copiedFiles, sourceDirectory); + //JAVA: } + //JAVA: } finally { + //JAVA: IOUtils.close(sourceDirectory.values()); + //JAVA: if (session != null) { + //JAVA: factory.cleanupSession(session.id); + //JAVA: } + //JAVA: } + //JAVA: } + #endregion + + SessionToken session = null; + Dictionary<string, Directory> sourceDirectory = new Dictionary<string, Directory>(); + Dictionary<string, IList<string>> copiedFiles = new Dictionary<string, IList<string>>(); + bool notify = false; + try + { + string version = handler.CurrentVersion; + session = replicator.CheckForUpdate(version); + + WriteToInfoStream(string.Format("doUpdate(): handlerVersion={0} session={1}", version, session)); + + if (session == null) + return; + + IDictionary<string, IList<RevisionFile>> requiredFiles = RequiredFiles(session.SourceFiles); + WriteToInfoStream(string.Format("doUpdate(): handlerVersion={0} session={1}", version, session)); + + foreach (KeyValuePair<string, IList<RevisionFile>> pair in requiredFiles) + { + string source = pair.Key; + Directory directory = factory.GetDirectory(session.Id, source); + + sourceDirectory.Add(source, directory); + List<string> cpFiles = new List<string>(); + copiedFiles.Add(source, cpFiles); + foreach (RevisionFile file in pair.Value) + { + if (disposed) + { + // if we're closed, abort file copy + WriteToInfoStream("doUpdate(): detected client was closed); abort file copy"); + return; + } + + Stream input = null; + IndexOutput output = null; + try + { + input = replicator.ObtainFile(session.Id, source, file.FileName); + output = directory.CreateOutput(file.FileName, IOContext.DEFAULT); + + CopyBytes(output, input); + + cpFiles.Add(file.FileName); + // TODO add some validation, on size / checksum + } + finally + { + IOUtils.Dispose(input, output); + } + } + // only notify if all required files were successfully obtained. + notify = true; + } + } + finally + { + if (session != null) + { + try + { + replicator.Release(session.Id); + } + finally + { + if (!notify) + { + // cleanup after ourselves + IOUtils.Dispose(sourceDirectory.Values); + factory.CleanupSession(session.Id); + } + } + } + } + + // notify outside the try-finally above, so the session is released sooner. + // the handler may take time to finish acting on the copied files, but the + // session itself is no longer needed. + try + { + if (notify && !disposed) + { // no use to notify if we are closed already + handler.RevisionReady(session.Version, session.SourceFiles, new ReadOnlyDictionary<string, IList<string>>(copiedFiles), sourceDirectory); + } + } + finally + { + IOUtils.Dispose(sourceDirectory.Values); + //TODO: Resharper Message, Expression is always true -> Verify and if so then we can remove the null check. + if (session != null) + { + factory.CleanupSession(session.Id); + } + } + + } + + /// <exception cref="IOException"></exception> + private void CopyBytes(IndexOutput output, Stream input) + { + int numBytes; + while ((numBytes = input.Read(copyBuffer, 0, copyBuffer.Length)) > 0) { + output.WriteBytes(copyBuffer, 0, numBytes); + } + } + + //.NET Note: Utility Method + private void WriteToInfoStream(string message) + { + if (infoStream.IsEnabled(INFO_STREAM_COMPONENT)) + infoStream.Message(INFO_STREAM_COMPONENT, message); + } + + /// <summary> + /// Returns the files required for replication. By default, this method returns + /// all files that exist in the new revision, but not in the handler. + /// </summary> + /// <param name="newRevisionFiles"></param> + /// <returns></returns> + private IDictionary<string, IList<RevisionFile>> RequiredFiles(IDictionary<string, IList<RevisionFile>> newRevisionFiles) + { + #region Java + //JAVA: protected Map<String,List<RevisionFile>> requiredFiles(Map<String,List<RevisionFile>> newRevisionFiles) { + //JAVA: Map<String,List<RevisionFile>> handlerRevisionFiles = handler.currentRevisionFiles(); + //JAVA: if (handlerRevisionFiles == null) { + //JAVA: return newRevisionFiles; + //JAVA: } + //JAVA: + //JAVA: Map<String,List<RevisionFile>> requiredFiles = new HashMap<>(); + //JAVA: for (Entry<String,List<RevisionFile>> e : handlerRevisionFiles.entrySet()) { + //JAVA: // put the handler files in a Set, for faster contains() checks later + //JAVA: Set<String> handlerFiles = new HashSet<>(); + //JAVA: for (RevisionFile file : e.getValue()) { + //JAVA: handlerFiles.add(file.fileName); + //JAVA: } + //JAVA: + //JAVA: // make sure to preserve revisionFiles order + //JAVA: ArrayList<RevisionFile> res = new ArrayList<>(); + //JAVA: String source = e.getKey(); + //JAVA: assert newRevisionFiles.containsKey(source) : "source not found in newRevisionFiles: " + newRevisionFiles; + //JAVA: for (RevisionFile file : newRevisionFiles.get(source)) { + //JAVA: if (!handlerFiles.contains(file.fileName)) { + //JAVA: res.add(file); + //JAVA: } + //JAVA: } + //JAVA: requiredFiles.put(source, res); + //JAVA: } + //JAVA: + //JAVA: return requiredFiles; + //JAVA: } + #endregion + + IDictionary<string, IList<RevisionFile>> handlerRevisionFiles = handler.CurrentRevisionFiles; + if (handlerRevisionFiles == null) + return newRevisionFiles; + + Dictionary<string, IList<RevisionFile>> requiredFiles = new Dictionary<string, IList<RevisionFile>>(); + foreach (KeyValuePair<string, IList<RevisionFile>> pair in handlerRevisionFiles) + { + // put the handler files in a Set, for faster contains() checks later + HashSet<string> handlerFiles = new HashSet<string>(pair.Value.Select(v => v.FileName)); + + // make sure to preserve revisionFiles order + string source = pair.Key; + Debug.Assert(newRevisionFiles.ContainsKey(source), string.Format("source not found in newRevisionFiles: {0}", newRevisionFiles)); + List<RevisionFile> res = newRevisionFiles[source] + .Where(file => !handlerFiles.Contains(file.FileName)) + .ToList(); + requiredFiles.Add(source, res); + } + return requiredFiles; + } + + /// <summary> + /// Start the update thread with the specified interval in milliseconds. For + /// debugging purposes, you can optionally set the name to set on + /// <see cref="ReplicationThread.Name"/>. If you pass <code>null</code>, a default name + /// will be set. + /// </summary> + /// <exception cref="InvalidOperationException"> if the thread has already been started </exception> + public void StartUpdateThread(long intervalMillis, string threadName) + { + #region Java + //JAVA: public synchronized void startUpdateThread(long intervalMillis, String threadName) { + //JAVA: ensureOpen(); + //JAVA: if (updateThread != null && updateThread.isAlive()) { + //JAVA: throw new IllegalStateException( + //JAVA: "cannot start an update thread when one is running, must first call 'stopUpdateThread()'"); + //JAVA: } + //JAVA: threadName = threadName == null ? INFO_STREAM_COMPONENT : "ReplicationThread-" + threadName; + //JAVA: updateThread = new ReplicationThread(intervalMillis); + //JAVA: updateThread.setName(threadName); + //JAVA: updateThread.start(); + //JAVA: // we rely on isAlive to return true in isUpdateThreadAlive, assert to be on the safe side + //JAVA: assert updateThread.isAlive() : "updateThread started but not alive?"; + //JAVA: } + #endregion + + EnsureOpen(); + if (updateThread != null && updateThread.IsAlive) + throw new InvalidOperationException("cannot start an update thread when one is running, must first call 'stopUpdateThread()'"); + + threadName = threadName == null ? INFO_STREAM_COMPONENT : "ReplicationThread-" + threadName; + updateThread = new ReplicationThread(intervalMillis, threadName, DoUpdate, HandleUpdateException, updateLock); + updateThread.Start(); + // we rely on isAlive to return true in isUpdateThreadAlive, assert to be on the safe side + Debug.Assert(updateThread.IsAlive, "updateThread started but not alive?"); + } + + /// <summary> + /// Stop the update thread. If the update thread is not running, silently does + /// nothing. This method returns after the update thread has stopped. + /// </summary> + public void StopUpdateThread() + { + #region Java + //JAVA: public synchronized void stopUpdateThread() { + //JAVA: if (updateThread != null) { + //JAVA: // this will trigger the thread to terminate if it awaits the lock. + //JAVA: // otherwise, if it's in the middle of replication, we wait for it to + //JAVA: // stop. + //JAVA: updateThread.stop.countDown(); + //JAVA: try { + //JAVA: updateThread.join(); + //JAVA: } catch (InterruptedException e) { + //JAVA: Thread.currentThread().interrupt(); + //JAVA: throw new ThreadInterruptedException(e); + //JAVA: } + //JAVA: updateThread = null; + //JAVA: } + //JAVA: } + #endregion + + // this will trigger the thread to terminate if it awaits the lock. + // otherwise, if it's in the middle of replication, we wait for it to + // stop. + if (updateThread != null) + updateThread.Stop(); + updateThread = null; + } + + /// <summary> + /// Returns true if the update thread is alive. The update thread is alive if + /// it has been <see cref="StartUpdateThread"/> and not + /// <see cref="StopUpdateThread"/>, as well as didn't hit an error which + /// caused it to terminate (i.e. <see cref="HandleUpdateException"/> + /// threw the exception further). + /// </summary> + public bool IsUpdateThreadAlive + { + get { return updateThread != null && updateThread.IsAlive; } + } + + /// <summary>Throws <see cref="ObjectDisposedException"/> if the client has already been disposed.</summary> + protected virtual void EnsureOpen() + { + if (!disposed) + return; + + throw new ObjectDisposedException("this update client has already been closed"); + } + + /// <summary> + /// Called when an exception is hit by the replication thread. The default + /// implementation prints the full stacktrace to the <seealso cref="InfoStream"/> set in + /// <seealso cref="InfoStream"/>, or the <see cref="Util.InfoStream.Default"/> + /// one. You can override to log the exception elswhere. + /// </summary> + /// <remarks> + /// If you override this method to throw the exception further, + /// the replication thread will be terminated. The only way to restart it is to + /// call <seealso cref="StopUpdateThread"/> followed by + /// <seealso cref="StartUpdateThread"/>. + /// </remarks> + protected virtual void HandleUpdateException(Exception exception) + { + WriteToInfoStream(string.Format("an error occurred during revision update: {0}", exception)); + } + + /// <summary> + /// Executes the update operation immediately, irregardess if an update thread + /// is running or not. + /// </summary> + /// <exception cref="IOException"></exception> + public void UpdateNow() + { + EnsureOpen(); + if (updateThread != null) + { + //NOTE: We have a worker running, we use that to perform the work instead by requesting it to run + // it's cycle immidiately. + updateThread.ExecuteImmediately(); + return; + } + + //NOTE: We don't have a worker running, so we just do the work. + updateLock.Lock(); + try + { + DoUpdate(); + } + finally + { + updateLock.Unlock(); + } + } + + protected virtual void Dispose(bool disposing) + { + if (disposed) + return; + + StopUpdateThread(); + disposed = true; + } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + public override string ToString() + { + if (updateThread == null) + return "ReplicationClient"; + return string.Format("ReplicationClient ({0})", updateThread.Name); + } + + //Note: LUCENENET specific, .NET does not work with Threads in the same way as Java does, so we mimic the same behavior using the ThreadPool instead. + private class ReplicationThread + { + #region Java + //JAVA: private class ReplicationThread extends Thread { + //JAVA: private final long interval; + //JAVA: // client uses this to stop us + //JAVA: final CountDownLatch stop = new CountDownLatch(1); + //JAVA: + //JAVA: public ReplicationThread(long interval) { + //JAVA: this.interval = interval; + //JAVA: } + //JAVA: + //JAVA: @SuppressWarnings("synthetic-access") + //JAVA: @Override + //JAVA: public void run() { + //JAVA: while (true) { + //JAVA: long time = System.currentTimeMillis(); + //JAVA: updateLock.lock(); + //JAVA: try { + //JAVA: doUpdate(); + //JAVA: } catch (Throwable t) { + //JAVA: handleUpdateException(t); + //JAVA: } finally { + //JAVA: updateLock.unlock(); + //JAVA: } + //JAVA: time = System.currentTimeMillis() - time; + //JAVA: + //JAVA: // adjust timeout to compensate the time spent doing the replication. + //JAVA: final long timeout = interval - time; + //JAVA: if (timeout > 0) { + //JAVA: try { + //JAVA: // this will return immediately if we were ordered to stop (count=0) + //JAVA: // or the timeout has elapsed. if it returns true, it means count=0, + //JAVA: // so terminate. + //JAVA: if (stop.await(timeout, TimeUnit.MILLISECONDS)) { + //JAVA: return; + //JAVA: } + //JAVA: } catch (InterruptedException e) { + //JAVA: // if we were interruted, somebody wants to terminate us, so just + //JAVA: // throw the exception further. + //JAVA: Thread.currentThread().interrupt(); + //JAVA: throw new ThreadInterruptedException(e); + //JAVA: } + //JAVA: } + //JAVA: } + //JAVA: } + //JAVA: } + #endregion + + private readonly Action doUpdate; + private readonly Action<Exception> handleException; + private readonly ReentrantLock @lock; + private readonly object controlLock = new object(); + + private readonly long interval; + private readonly AutoResetEvent handle = new AutoResetEvent(false); + + private AutoResetEvent stopHandle; + + /// <summary> + /// Gets or sets the name + /// </summary> + public string Name { get; private set; } + + /// <summary> + /// + /// </summary> + /// <param name="intervalMillis"></param> + /// <param name="threadName"></param> + /// <param name="doUpdate"></param> + /// <param name="handleException"></param> + /// <param name="lock"></param> + public ReplicationThread(long intervalMillis, string threadName, Action doUpdate, Action<Exception> handleException, ReentrantLock @lock) + { + this.doUpdate = doUpdate; + this.handleException = handleException; + this.@lock = @lock; + Name = threadName; + this.interval = intervalMillis; + } + + /// <summary> + /// + /// </summary> + public bool IsAlive { get; private set; } + + /// <summary> + /// + /// </summary> + public void Start() + { + lock (controlLock) + { + if (IsAlive) + return; + IsAlive = true; + } + RegisterWait(interval); + } + + /// <summary> + /// + /// </summary> + public void Stop() + { + lock (controlLock) + { + if (!IsAlive) + return; + IsAlive = false; + } + stopHandle = new AutoResetEvent(false); + + //NOTE: Execute any outstanding, this execution will terminate almost instantaniously if it's not already running. + ExecuteImmediately(); + + stopHandle.WaitOne(); + stopHandle = null; + } + + /// <summary> + /// Executes the next cycle of work immediately + /// </summary> + public void ExecuteImmediately() + { + handle.Set(); + } + + private void RegisterWait(long timeout) + { + //NOTE: We don't care about timedout as it can either be because we was requested to run immidiately or stop. + if (IsAlive) + ThreadPool.RegisterWaitForSingleObject(handle, (state, timedout) => Run(), null, timeout, true); + else + SignalStop(); + } + + private void SignalStop() + { + if (stopHandle != null) + stopHandle.Set(); + } + + private void Run() + { + if (!IsAlive) + { + SignalStop(); + return; + } + + Stopwatch timer = Stopwatch.StartNew(); + @lock.Lock(); + try + { + doUpdate(); + } + catch (Exception exception) + { + handleException(exception); + } + finally + { + @lock.Unlock(); + + timer.Stop(); + long driftAdjusted = Math.Max(interval - timer.ElapsedMilliseconds, 0); + if (IsAlive) + RegisterWait(driftAdjusted); + else + SignalStop(); + } + } + } + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/lucenenet/blob/6da4dd20/src/Lucene.Net.Replicator/Replicator.cs ---------------------------------------------------------------------- diff --git a/src/Lucene.Net.Replicator/Replicator.cs b/src/Lucene.Net.Replicator/Replicator.cs new file mode 100644 index 0000000..af7ef51 --- /dev/null +++ b/src/Lucene.Net.Replicator/Replicator.cs @@ -0,0 +1,91 @@ +//STATUS: DRAFT - 4.8.0 +using System; +using System.IO; + +namespace Lucene.Net.Replicator +{ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + /// <summary> + /// An interface for replicating files. Allows a producer to + /// <see cref="Publish"/> <see cref="IRevision"/>s and consumers to + /// <see cref="CheckForUpdate"/>. When a client needs to be + /// updated, it is given a <see cref="SessionToken"/> through which it can + /// <see cref="ObtainFile"/> the files of that + /// revision. After the client has finished obtaining all the files, it should + /// <see cref="Release"/> the given session, so that the files can be + /// reclaimed if they are not needed anymore. + /// <p> + /// A client is always updated to the newest revision available. That is, if a + /// client is on revision <em>r1</em> and revisions <em>r2</em> and <em>r3</em> + /// were published, then when the cllient will next check for update, it will + /// receive <em>r3</em>. + /// </p> + /// </summary> + /// <remarks> + /// Lucene.Experimental + /// </remarks> + public interface IReplicator : IDisposable + { + /// <summary> + /// Publish a new <see cref="IRevision"/> for consumption by clients. It is the + /// caller's responsibility to verify that the revision files exist and can be + /// read by clients. When the revision is no longer needed, it will be + /// <see cref="Release"/>d by the replicator. + /// </summary> + /// <param name="revision">The <see cref="IRevision"/> to publish.</param> + /// <exception cref="IOException"></exception> + void Publish(IRevision revision); + + /// <summary> + /// Check whether the given version is up-to-date and returns a + /// <see cref="SessionToken"/> which can be used for fetching the revision files, + /// otherwise returns <code>null</code>. + /// </summary> + /// <remarks> + /// When the returned session token is no longer needed, you + /// should call <see cref="Release"/> so that the session resources can be + /// reclaimed, including the revision files. + /// </remarks> + /// <param name="currentVersion"></param> + /// <returns></returns> + /// <exception cref="IOException"></exception> + SessionToken CheckForUpdate(string currentVersion);// throws IOException; + + /// <summary> + /// Notify that the specified <see cref="SessionToken"/> is no longer needed by the caller. + /// </summary> + /// <param name="sessionId"></param> + /// <exception cref="IOException"></exception> + void Release(string sessionId); + + /// <summary> + /// Returns an <see cref="Stream"/> for the requested file and source in the + /// context of the given <see cref="SessionToken.Id"/>. + /// </summary> + /// <remarks> + /// It is the caller's responsibility to call <see cref="IDisposable.Dispose"/> on the returned stream. + /// </remarks> + /// <param name="sessionId"></param> + /// <param name="source"></param> + /// <param name="fileName"></param> + /// <returns></returns> + /// <exception cref="SessionExpiredException">The specified session has already expired</exception> + Stream ObtainFile(string sessionId, string source, string fileName); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/lucenenet/blob/6da4dd20/src/Lucene.Net.Replicator/Revision.cs ---------------------------------------------------------------------- diff --git a/src/Lucene.Net.Replicator/Revision.cs b/src/Lucene.Net.Replicator/Revision.cs new file mode 100644 index 0000000..3d6fe19 --- /dev/null +++ b/src/Lucene.Net.Replicator/Revision.cs @@ -0,0 +1,81 @@ +//STATUS: DRAFT - 4.8.0 + +using System; +using System.Collections.Generic; +using System.IO; + +namespace Lucene.Net.Replicator +{ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + /// <summary> + /// A revision comprises lists of files that come from different sources and need + /// to be replicated together to e.g. guarantee that all resources are in sync. + /// In most cases an application will replicate a single index, and so the + /// revision will contain files from a single source. However, some applications + /// may require to treat a collection of indexes as a single entity so that the + /// files from all sources are replicated together, to guarantee consistency + /// beween them. For example, an application which indexes facets will need to + /// replicate both the search and taxonomy indexes together, to guarantee that + /// they match at the client side. + /// </summary> + /// <remarks> + /// Lucene.Experimental + /// </remarks> + public interface IRevision : IComparable<IRevision> + { + /// <summary> + /// Returns a string representation of the version of this revision. The + /// version is used by <see cref="CompareTo"/> as well as to + /// serialize/deserialize revision information. Therefore it must be self + /// descriptive as well as be able to identify one revision from another. + /// </summary> + string Version { get; } + + /// <summary> + /// Returns the files that comprise this revision, as a mapping from a source + /// to a list of files. + /// </summary> + IDictionary<string, IList<RevisionFile>> SourceFiles { get; } + + /// <summary> + /// Compares the revision to the given version string. Behaves like + /// <see cref="IComparable{T}.CompareTo"/> + /// </summary> + int CompareTo(string version); + + /// <summary> + /// Returns an {@link IndexInput} for the given fileName and source. It is the + /// caller's respnsibility to close the {@link IndexInput} when it has been + /// consumed. + /// </summary> + /// <param name="source"></param> + /// <param name="fileName"></param> + /// <returns></returns> + /// <exception cref="IOException"></exception> + //TODO: Stream or IndexInput? + Stream Open(string source, string fileName); + + /// <summary> + /// Called when this revision can be safely released, i.e. where there are no + /// more references to it. + /// </summary> + /// <exception cref="IOException"></exception> + void Release(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/lucenenet/blob/6da4dd20/src/Lucene.Net.Replicator/RevisionFile.cs ---------------------------------------------------------------------- diff --git a/src/Lucene.Net.Replicator/RevisionFile.cs b/src/Lucene.Net.Replicator/RevisionFile.cs new file mode 100644 index 0000000..abd7aff --- /dev/null +++ b/src/Lucene.Net.Replicator/RevisionFile.cs @@ -0,0 +1,87 @@ +//STATUS: DRAFT - 4.8.0 + +using System; + +namespace Lucene.Net.Replicator +{ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + /// <summary> + /// Describes a file in a <see cref="IRevision"/>. A file has a source, which allows a + /// single revision to contain files from multiple sources (e.g. multiple indexes). + /// </summary> + /// <remarks> + /// Lucene.Experimental + /// </remarks> + public class RevisionFile : IEquatable<RevisionFile> + { + /// <summary> + /// Gets the name of the file. + /// </summary> + public string FileName { get; private set; } + + //TODO: can this be readonly? + /// <summary> + /// Gets or sets the length of the file denoted by <see cref="FileName"/>. + /// </summary> + public long Length { get; set; } + + /// <summary> + /// Constructor with the given file name and optionally length. + /// </summary> + /// <param name="fileName"></param> + /// <param name="length">Optional, the length of the file.</param> + public RevisionFile(string fileName, long length = -1) + { + if (string.IsNullOrEmpty(fileName)) throw new ArgumentException("fileName must not be null or empty", "fileName"); + + FileName = fileName; + Length = length; + } + + public override string ToString() + { + return string.Format("fileName={0} length={1}", FileName, Length); + } + + #region Resharper Generated Code + public bool Equals(RevisionFile other) + { + if (ReferenceEquals(null, other)) return false; + if (ReferenceEquals(this, other)) return true; + return string.Equals(FileName, other.FileName) && Length == other.Length; + } + + public override bool Equals(object obj) + { + if (ReferenceEquals(null, obj)) return false; + if (ReferenceEquals(this, obj)) return true; + if (obj.GetType() != this.GetType()) return false; + return Equals((RevisionFile)obj); + } + + public override int GetHashCode() + { + unchecked + { + return (FileName.GetHashCode() * 397) ^ Length.GetHashCode(); + } + } + #endregion + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/lucenenet/blob/6da4dd20/src/Lucene.Net.Replicator/SessionExpiredException.cs ---------------------------------------------------------------------- diff --git a/src/Lucene.Net.Replicator/SessionExpiredException.cs b/src/Lucene.Net.Replicator/SessionExpiredException.cs new file mode 100644 index 0000000..38eed6c --- /dev/null +++ b/src/Lucene.Net.Replicator/SessionExpiredException.cs @@ -0,0 +1,58 @@ +//STATUS: DRAFT - 4.8.0 + +using System; +using System.IO; +using System.Runtime.Serialization; + +namespace Lucene.Net.Replicator +{ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + /// <summary> + /// Exception indicating that a revision update session was expired due to lack of activity. + /// </summary> + /// <remarks> + /// <see cref="LocalReplicator.DEFAULT_SESSION_EXPIRATION_THRESHOLD"/> + /// <see cref="LocalReplicator.ExpirationThreshold"/> + /// + /// Lucene.Experimental + /// </remarks> + public class SessionExpiredException : IOException + { + // + // For guidelines regarding the creation of new exception types, see + // http://msdn.microsoft.com/library/default.asp?url=/library/en-us/cpgenref/html/cpconerrorraisinghandlingguidelines.asp + // and + // http://msdn.microsoft.com/library/default.asp?url=/library/en-us/dncscol/html/csharp07192001.asp + // + + public SessionExpiredException() + { + } + + public SessionExpiredException(string message) + : base(message) + { + } + + public SessionExpiredException(string message, Exception inner) + : base(message, inner) + { + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/lucenenet/blob/6da4dd20/src/Lucene.Net.Replicator/SessionToken.cs ---------------------------------------------------------------------- diff --git a/src/Lucene.Net.Replicator/SessionToken.cs b/src/Lucene.Net.Replicator/SessionToken.cs new file mode 100644 index 0000000..a9440d7 --- /dev/null +++ b/src/Lucene.Net.Replicator/SessionToken.cs @@ -0,0 +1,129 @@ +//STATUS: DRAFT - 4.8.0 + +using System; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.IO; +using Lucene.Net.Store; +using Lucene.Net.Support.IO; + +namespace Lucene.Net.Replicator +{ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + /// <summary> + /// Token for a replication session, for guaranteeing that source replicated + /// files will be kept safe until the replication completes. + /// </summary> + /// <remarks> + /// <see cref="IReplicator.CheckForUpdate"/> + /// <see cref="IReplicator.Release"/> + /// <see cref="LocalReplicator.DEFAULT_SESSION_EXPIRATION_THRESHOLD"/> + /// + /// Lucene.Experimental + /// </remarks> + public sealed class SessionToken + { + /// <summary> + /// Id of this session. + /// Should be passed when releasing the session, thereby acknowledging the + /// <see cref="IReplicator"/> that this session is no longer in use. + /// <see cref="IReplicator.Release"/> + /// </summary> + public string Id { get; private set; } + + /// <summary> + /// <see cref="IRevision.Version"/> + /// </summary> + public string Version { get; private set; } + + /// <summary> + /// <see cref="IRevision.SourceFiles"/> + /// </summary> + public IDictionary<string, IList<RevisionFile>> SourceFiles { get; private set; } + + /// <summary> + /// Constructor which deserializes from the given <see cref="DataInput"/>. + /// </summary> + /// <param name="reader"></param> + /// <exception cref="IOException"></exception> + public SessionToken(DataInputStream reader) + { + Id = reader.ReadUTF(); + Version = reader.ReadUTF(); + + var sourceFiles = new Dictionary<string, IList<RevisionFile>>(); + int numSources = reader.ReadInt32(); + while (numSources > 0) + { + string source = reader.ReadUTF(); + int numFiles = reader.ReadInt32(); + + List<RevisionFile> files = new List<RevisionFile>(numFiles); + for (int i = 0; i < numFiles; i++) + { + files.Add(new RevisionFile(reader.ReadUTF(), reader.ReadInt64())); + } + sourceFiles.Add(source, files); + --numSources; + } + SourceFiles = sourceFiles; + } + + /// <summary> + /// Constructor with the given id and revision. + /// </summary> + /// <param name="id"></param> + /// <param name="revision"></param> + /// <exception cref="IOException"></exception> + public SessionToken(string id, IRevision revision) + { + Id = id; + Version = revision.Version; + SourceFiles = revision.SourceFiles; + } + + /// <summary> + /// Serialize the token data for communication between server and client. + /// </summary> + /// <param name="writer"></param> + /// <exception cref="IOException"></exception> + public void Serialize(DataOutputStream writer) + { + writer.WriteUTF(Id); + writer.WriteUTF(Version); + writer.WriteInt32(SourceFiles.Count); + + foreach (KeyValuePair<string, IList<RevisionFile>> pair in SourceFiles) + { + writer.WriteUTF(pair.Key); + writer.WriteInt32(pair.Value.Count); + foreach (RevisionFile file in pair.Value) + { + writer.WriteUTF(file.FileName); + writer.WriteInt64(file.Length); + } + } + } + + public override string ToString() + { + return string.Format("id={0} version={1} files={2}", Id, Version, SourceFiles); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/lucenenet/blob/6da4dd20/src/Lucene.Net.Replicator/packages.config ---------------------------------------------------------------------- diff --git a/src/Lucene.Net.Replicator/packages.config b/src/Lucene.Net.Replicator/packages.config new file mode 100644 index 0000000..3e14be6 --- /dev/null +++ b/src/Lucene.Net.Replicator/packages.config @@ -0,0 +1,4 @@ +<?xml version="1.0" encoding="utf-8"?> +<packages> + <package id="Newtonsoft.Json" version="9.0.1" targetFramework="net451" /> +</packages> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/lucenenet/blob/6da4dd20/src/Lucene.Net.Tests.Replicator/Http/HttpReplicatorTest.cs ---------------------------------------------------------------------- diff --git a/src/Lucene.Net.Tests.Replicator/Http/HttpReplicatorTest.cs b/src/Lucene.Net.Tests.Replicator/Http/HttpReplicatorTest.cs new file mode 100644 index 0000000..3d116f9 --- /dev/null +++ b/src/Lucene.Net.Tests.Replicator/Http/HttpReplicatorTest.cs @@ -0,0 +1,104 @@ +//STATUS: DRAFT - 4.8.0 + +using System; +using System.Collections.Generic; +using System.Globalization; +using System.IO; +using Lucene.Net.Documents; +using Lucene.Net.Index; +using Lucene.Net.Replicator; +using Lucene.Net.Replicator.Http; +using Lucene.Net.Support; +using Lucene.Net.Util; +using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.TestHost; +using Microsoft.Extensions.DependencyInjection; +using NUnit.Framework; +using Directory = Lucene.Net.Store.Directory; + +namespace Lucene.Net.Tests.Replicator.Http +{ + public class HttpReplicatorTest : ReplicatorTestCase + { + private DirectoryInfo clientWorkDir; + private IReplicator serverReplicator; + private IndexWriter writer; + private DirectoryReader reader; + + private int port; + private string host; + private TestServer server; + + private Directory serverIndexDir; + private Directory handlerIndexDir; + + private void StartServer() + { + ReplicationService service = new ReplicationService(new Dictionary<string, IReplicator> { { "s1", serverReplicator } }); + + server = NewHttpServer<ReplicationServlet>(service); + port = ServerPort(server); + host = ServerHost(server); + } + + public override void SetUp() + { + base.SetUp(); + //JAVA: System.setProperty("org.eclipse.jetty.LEVEL", "DEBUG"); // sets stderr logging to DEBUG level + clientWorkDir = CreateTempDir("httpReplicatorTest"); + handlerIndexDir = NewDirectory(); + serverIndexDir = NewDirectory(); + serverReplicator = new LocalReplicator(); + StartServer(); + + IndexWriterConfig conf = NewIndexWriterConfig(TEST_VERSION_CURRENT, null); + conf.IndexDeletionPolicy = new SnapshotDeletionPolicy(conf.IndexDeletionPolicy); + writer = new IndexWriter(serverIndexDir, conf); + reader = DirectoryReader.Open(writer, false); + } + + public override void TearDown() + { + StopHttpServer(server); + IOUtils.Dispose(reader, writer, handlerIndexDir, serverIndexDir); + //JAVA: System.clearProperty("org.eclipse.jetty.LEVEL"); + base.TearDown(); + } + + private void PublishRevision(int id) + { + Document doc = new Document(); + writer.AddDocument(doc); + writer.SetCommitData(Collections.SingletonMap("ID", id.ToString("X"))); + writer.Commit(); + serverReplicator.Publish(new IndexRevision(writer)); + } + + private void ReopenReader() + { + DirectoryReader newReader = DirectoryReader.OpenIfChanged(reader); + assertNotNull(newReader); + reader.Dispose(); + reader = newReader; + } + + + [Test] + public void TestBasic() + { + IReplicator replicator = new HttpReplicator(host, port, ReplicationService.REPLICATION_CONTEXT + "/s1", server.CreateHandler()); + ReplicationClient client = new ReplicationClient(replicator, new IndexReplicationHandler(handlerIndexDir, null), + new PerSessionDirectoryFactory(clientWorkDir.FullName)); + + PublishRevision(1); + client.UpdateNow(); + ReopenReader(); + assertEquals(1, int.Parse(reader.IndexCommit.UserData["ID"], NumberStyles.HexNumber)); + + PublishRevision(2); + client.UpdateNow(); + ReopenReader(); + assertEquals(2, int.Parse(reader.IndexCommit.UserData["ID"], NumberStyles.HexNumber)); + } + } +} http://git-wip-us.apache.org/repos/asf/lucenenet/blob/6da4dd20/src/Lucene.Net.Tests.Replicator/Http/ReplicationServlet.cs ---------------------------------------------------------------------- diff --git a/src/Lucene.Net.Tests.Replicator/Http/ReplicationServlet.cs b/src/Lucene.Net.Tests.Replicator/Http/ReplicationServlet.cs new file mode 100644 index 0000000..63420a6 --- /dev/null +++ b/src/Lucene.Net.Tests.Replicator/Http/ReplicationServlet.cs @@ -0,0 +1,22 @@ +//STATUS: DRAFT - 4.8.0 + +using System.Threading.Tasks; +using Lucene.Net.Replicator.AspNetCore; +using Lucene.Net.Replicator.Http; +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; + +namespace Lucene.Net.Tests.Replicator.Http +{ + public class ReplicationServlet + { + public void Configure(IApplicationBuilder app, IHostingEnvironment env, ReplicationService service) + { + app.Run(async context => + { + await Task.Yield(); + service.Perform(context.Request, context.Response); + }); + } + } +} \ No newline at end of file
