http://git-wip-us.apache.org/repos/asf/lucenenet/blob/6da4dd20/src/Lucene.Net.Replicator/IndexAndTaxonomyReplicationHandler.cs ---------------------------------------------------------------------- diff --git a/src/Lucene.Net.Replicator/IndexAndTaxonomyReplicationHandler.cs b/src/Lucene.Net.Replicator/IndexAndTaxonomyReplicationHandler.cs new file mode 100644 index 0000000..a9629b8 --- /dev/null +++ b/src/Lucene.Net.Replicator/IndexAndTaxonomyReplicationHandler.cs @@ -0,0 +1,276 @@ +//STATUS: DRAFT - 4.8.0 + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Lucene.Net.Index; +using Lucene.Net.Store; +using Lucene.Net.Support; +using Lucene.Net.Util; +using Directory = Lucene.Net.Store.Directory; + +namespace Lucene.Net.Replicator +{ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + /// <summary> + /// A {@link ReplicationHandler} for replication of an index and taxonomy pair. + /// See {@link IndexReplicationHandler} for more detail. This handler ensures + /// that the search and taxonomy indexes are replicated in a consistent way. + /// + /// <see cref="IndexReplicationHandler"/> + /// </summary> + /// <remarks> + /// If you intend to recreate a taxonomy index, you should make sure + /// to reopen an IndexSearcher and TaxonomyReader pair via the provided callback, + /// to guarantee that both indexes are in sync. This handler does not prevent + /// replicating such index and taxonomy pairs, and if they are reopened by a + /// different thread, unexpected errors can occur, as well as inconsistency + /// between the taxonomy and index readers. + /// + /// Lucene.Experimental + /// </remarks> + public class IndexAndTaxonomyReplicationHandler : IReplicationHandler + { + /// <summary> + /// The component used to log messages to the {@link InfoStream#getDefault()default} {@link InfoStream}. + /// </summary> + public const string INFO_STREAM_COMPONENT = "IndexAndTaxonomyReplicationHandler"; + + private readonly Directory indexDirectory; + private readonly Directory taxonomyDirectory; + private readonly Func<bool?> callback; + + private InfoStream infoStream = InfoStream.Default; + + public string CurrentVersion { get; private set; } + public IDictionary<string, IList<RevisionFile>> CurrentRevisionFiles { get; private set; } + public InfoStream InfoStream + { + get { return infoStream; } + set { infoStream = value ?? InfoStream.NO_OUTPUT; } + } + + /// <summary> + /// Constructor with the given index directory and callback to notify when the indexes were updated. + /// </summary> + /// <param name="indexDirectory"></param> + /// <param name="taxonomyDirectory"></param> + /// <param name="callback"></param> + /// <exception cref="System.IO.IOException"></exception> + public IndexAndTaxonomyReplicationHandler(Directory indexDirectory, Directory taxonomyDirectory, Func<bool?> callback) + { + this.indexDirectory = indexDirectory; + this.taxonomyDirectory = taxonomyDirectory; + this.callback = callback; + + CurrentVersion = null; + CurrentRevisionFiles = null; + + bool indexExists = DirectoryReader.IndexExists(indexDirectory); + bool taxonomyExists = DirectoryReader.IndexExists(taxonomyDirectory); + + //JAVA: IllegalStateException + if (indexExists != taxonomyExists) + throw new InvalidOperationException(string.Format("search and taxonomy indexes must either both exist or not: index={0} taxo={1}", indexExists, taxonomyExists)); + + if (indexExists) + { + IndexCommit indexCommit = IndexReplicationHandler.GetLastCommit(indexDirectory); + IndexCommit taxonomyCommit = IndexReplicationHandler.GetLastCommit(taxonomyDirectory); + + CurrentRevisionFiles = IndexAndTaxonomyRevision.RevisionFiles(indexCommit, taxonomyCommit); + CurrentVersion = IndexAndTaxonomyRevision.RevisionVersion(indexCommit, taxonomyCommit); + + WriteToInfoStream( + string.Format("constructor(): currentVersion={0} currentRevisionFiles={1}", CurrentVersion, CurrentRevisionFiles), + string.Format("constructor(): indexCommit={0} taxoCommit={1}", indexCommit, taxonomyCommit)); + } + } + + /// <summary> + /// + /// </summary> + /// <param name="version"></param> + /// <param name="revisionFiles"></param> + /// <param name="copiedFiles"></param> + /// <param name="sourceDirectory"></param> + /// <exception cref=""></exception> + public void RevisionReady(string version, + IDictionary<string, IList<RevisionFile>> revisionFiles, + IDictionary<string, IList<string>> copiedFiles, + IDictionary<string, Directory> sourceDirectory) + { + #region Java + //JAVA: Directory taxoClientDir = sourceDirectory.get(IndexAndTaxonomyRevision.TAXONOMY_SOURCE); + //JAVA: Directory indexClientDir = sourceDirectory.get(IndexAndTaxonomyRevision.INDEX_SOURCE); + //JAVA: List<String> taxoFiles = copiedFiles.get(IndexAndTaxonomyRevision.TAXONOMY_SOURCE); + //JAVA: List<String> indexFiles = copiedFiles.get(IndexAndTaxonomyRevision.INDEX_SOURCE); + //JAVA: String taxoSegmentsFile = IndexReplicationHandler.getSegmentsFile(taxoFiles, true); + //JAVA: String indexSegmentsFile = IndexReplicationHandler.getSegmentsFile(indexFiles, false); + //JAVA: + //JAVA: boolean success = false; + //JAVA: try { + //JAVA: // copy taxonomy files before index files + //JAVA: IndexReplicationHandler.copyFiles(taxoClientDir, taxoDir, taxoFiles); + //JAVA: IndexReplicationHandler.copyFiles(indexClientDir, indexDir, indexFiles); + //JAVA: + //JAVA: // fsync all copied files (except segmentsFile) + //JAVA: if (!taxoFiles.isEmpty()) { + //JAVA: taxoDir.sync(taxoFiles); + //JAVA: } + //JAVA: indexDir.sync(indexFiles); + //JAVA: + //JAVA: // now copy and fsync segmentsFile, taxonomy first because it is ok if a + //JAVA: // reader sees a more advanced taxonomy than the index. + //JAVA: if (taxoSegmentsFile != null) { + //JAVA: taxoClientDir.copy(taxoDir, taxoSegmentsFile, taxoSegmentsFile, IOContext.READONCE); + //JAVA: } + //JAVA: indexClientDir.copy(indexDir, indexSegmentsFile, indexSegmentsFile, IOContext.READONCE); + //JAVA: + //JAVA: if (taxoSegmentsFile != null) { + //JAVA: taxoDir.sync(Collections.singletonList(taxoSegmentsFile)); + //JAVA: } + //JAVA: indexDir.sync(Collections.singletonList(indexSegmentsFile)); + //JAVA: + //JAVA: success = true; + //JAVA: } finally { + //JAVA: if (!success) { + //JAVA: taxoFiles.add(taxoSegmentsFile); // add it back so it gets deleted too + //JAVA: IndexReplicationHandler.cleanupFilesOnFailure(taxoDir, taxoFiles); + //JAVA: indexFiles.add(indexSegmentsFile); // add it back so it gets deleted too + //JAVA: IndexReplicationHandler.cleanupFilesOnFailure(indexDir, indexFiles); + //JAVA: } + //JAVA: } + //JAVA: + //JAVA: // all files have been successfully copied + sync'd. update the handler's state + //JAVA: currentRevisionFiles = revisionFiles; + //JAVA: currentVersion = version; + //JAVA: + //JAVA: if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) { + //JAVA: infoStream.message(INFO_STREAM_COMPONENT, "revisionReady(): currentVersion=" + currentVersion + //JAVA: + " currentRevisionFiles=" + currentRevisionFiles); + //JAVA: } + //JAVA: + //JAVA: // update the segments.gen file + //JAVA: IndexReplicationHandler.writeSegmentsGen(taxoSegmentsFile, taxoDir); + //JAVA: IndexReplicationHandler.writeSegmentsGen(indexSegmentsFile, indexDir); + //JAVA: + //JAVA: // Cleanup the index directory from old and unused index files. + //JAVA: // NOTE: we don't use IndexWriter.deleteUnusedFiles here since it may have + //JAVA: // side-effects, e.g. if it hits sudden IO errors while opening the index + //JAVA: // (and can end up deleting the entire index). It is not our job to protect + //JAVA: // against those errors, app will probably hit them elsewhere. + //JAVA: IndexReplicationHandler.cleanupOldIndexFiles(indexDir, indexSegmentsFile); + //JAVA: IndexReplicationHandler.cleanupOldIndexFiles(taxoDir, taxoSegmentsFile); + //JAVA: + //JAVA: // successfully updated the index, notify the callback that the index is + //JAVA: // ready. + //JAVA: if (callback != null) { + //JAVA: try { + //JAVA: callback.call(); + //JAVA: } catch (Exception e) { + //JAVA: throw new IOException(e); + //JAVA: } + //JAVA: } + #endregion + + Directory taxonomyClientDirectory = sourceDirectory[IndexAndTaxonomyRevision.TAXONOMY_SOURCE]; + Directory indexClientDirectory = sourceDirectory[IndexAndTaxonomyRevision.INDEX_SOURCE]; + IList<string> taxonomyFiles = copiedFiles[IndexAndTaxonomyRevision.TAXONOMY_SOURCE]; + IList<string> indexFiles = copiedFiles[IndexAndTaxonomyRevision.INDEX_SOURCE]; + string taxonomySegmentsFile = IndexReplicationHandler.GetSegmentsFile(taxonomyFiles, true); + string indexSegmentsFile = IndexReplicationHandler.GetSegmentsFile(indexFiles, false); + + bool success = false; + try + { + // copy taxonomy files before index files + IndexReplicationHandler.CopyFiles(taxonomyClientDirectory, taxonomyDirectory, taxonomyFiles); + IndexReplicationHandler.CopyFiles(indexClientDirectory, indexDirectory, indexFiles); + + // fsync all copied files (except segmentsFile) + if (taxonomyFiles.Any()) + taxonomyDirectory.Sync(taxonomyFiles); + indexDirectory.Sync(indexFiles); + + // now copy and fsync segmentsFile, taxonomy first because it is ok if a + // reader sees a more advanced taxonomy than the index. + if (taxonomySegmentsFile != null) + taxonomyClientDirectory.Copy(taxonomyDirectory, taxonomySegmentsFile, taxonomySegmentsFile, IOContext.READ_ONCE); + indexClientDirectory.Copy(indexDirectory, indexSegmentsFile, indexSegmentsFile, IOContext.READ_ONCE); + + if (taxonomySegmentsFile != null) + taxonomyDirectory.Sync(new[] { taxonomySegmentsFile }); + indexDirectory.Sync(new[] { indexSegmentsFile }); + + success = true; + } + finally + { + if (!success) + { + taxonomyFiles.Add(taxonomySegmentsFile); // add it back so it gets deleted too + IndexReplicationHandler.CleanupFilesOnFailure(taxonomyDirectory, taxonomyFiles); + indexFiles.Add(indexSegmentsFile); // add it back so it gets deleted too + IndexReplicationHandler.CleanupFilesOnFailure(indexDirectory, indexFiles); + } + } + + // all files have been successfully copied + sync'd. update the handler's state + CurrentRevisionFiles = revisionFiles; + CurrentVersion = version; + + WriteToInfoStream("revisionReady(): currentVersion=" + CurrentVersion + " currentRevisionFiles=" + CurrentRevisionFiles); + + // update the segments.gen file + IndexReplicationHandler.WriteSegmentsGen(taxonomySegmentsFile, taxonomyDirectory); + IndexReplicationHandler.WriteSegmentsGen(indexSegmentsFile, indexDirectory); + + // Cleanup the index directory from old and unused index files. + // NOTE: we don't use IndexWriter.deleteUnusedFiles here since it may have + // side-effects, e.g. if it hits sudden IO errors while opening the index + // (and can end up deleting the entire index). It is not our job to protect + // against those errors, app will probably hit them elsewhere. + IndexReplicationHandler.CleanupOldIndexFiles(indexDirectory, indexSegmentsFile); + IndexReplicationHandler.CleanupOldIndexFiles(taxonomyDirectory, taxonomySegmentsFile); + + // successfully updated the index, notify the callback that the index is + // ready. + if (callback != null) { + try { + callback.Invoke(); + } catch (Exception e) { + throw new IOException(e.Message, e); + } + } + } + + private void WriteToInfoStream(params string[] messages) + { + if (!InfoStream.IsEnabled(INFO_STREAM_COMPONENT)) + return; + + foreach (string message in messages) + InfoStream.Message(INFO_STREAM_COMPONENT, message); + } + } +}
http://git-wip-us.apache.org/repos/asf/lucenenet/blob/6da4dd20/src/Lucene.Net.Replicator/IndexAndTaxonomyRevision.cs ---------------------------------------------------------------------- diff --git a/src/Lucene.Net.Replicator/IndexAndTaxonomyRevision.cs b/src/Lucene.Net.Replicator/IndexAndTaxonomyRevision.cs new file mode 100644 index 0000000..8d32fac --- /dev/null +++ b/src/Lucene.Net.Replicator/IndexAndTaxonomyRevision.cs @@ -0,0 +1,334 @@ +//STATUS: DRAFT - 4.8.0 + +using System; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Diagnostics; +using System.Globalization; +using System.IO; +using System.Linq; +using Lucene.Net.Facet.Taxonomy.Directory; +using Lucene.Net.Facet.Taxonomy.WriterCache; +using Lucene.Net.Index; +using Lucene.Net.Store; +using Directory = Lucene.Net.Store.Directory; + +namespace Lucene.Net.Replicator +{ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + /// <summary> + /// A <see cref="IRevision"/> of a single index and taxonomy index files which comprises + /// the list of files from both indexes. This revision should be used whenever a + /// pair of search and taxonomy indexes need to be replicated together to + /// guarantee consistency of both on the replicating (client) side. + /// </summary> + /// <remarks> + /// Lucene.Experimental + /// </remarks> + public class IndexAndTaxonomyRevision : IRevision + { + #region Java + //JAVA: private final IndexWriter indexWriter; + //JAVA: private final SnapshotDirectoryTaxonomyWriter taxoWriter; + //JAVA: private final IndexCommit indexCommit, taxoCommit; + //JAVA: private final SnapshotDeletionPolicy indexSDP, taxoSDP; + //JAVA: private final String version; + //JAVA: private final Map<String, List<RevisionFile>> sourceFiles; + #endregion + + public const string INDEX_SOURCE = "index"; + public const string TAXONOMY_SOURCE = "taxonomy"; + + private readonly IndexWriter indexWriter; + private readonly SnapshotDirectoryTaxonomyWriter taxonomyWriter; + private readonly IndexCommit indexCommit, taxonomyCommit; + private readonly SnapshotDeletionPolicy indexSdp, taxonomySdp; + + public string Version { get; private set; } + public IDictionary<string, IList<RevisionFile>> SourceFiles { get; private set; } + + /// <summary> + /// + /// </summary> + /// <param name="indexWriter"></param> + /// <param name="taxonomyWriter"></param> + /// <exception cref="IOException"></exception> + public IndexAndTaxonomyRevision(IndexWriter indexWriter, SnapshotDirectoryTaxonomyWriter taxonomyWriter) + { + #region Java + //JAVA: /** + //JAVA: * Constructor over the given {@link IndexWriter}. Uses the last + //JAVA: * {@link IndexCommit} found in the {@link Directory} managed by the given + //JAVA: * writer. + //JAVA: */ + //JAVA: public IndexAndTaxonomyRevision(IndexWriter indexWriter, SnapshotDirectoryTaxonomyWriter taxoWriter) + //JAVA: throws IOException { + //JAVA: IndexDeletionPolicy delPolicy = indexWriter.getConfig().getIndexDeletionPolicy(); + //JAVA: if (!(delPolicy instanceof SnapshotDeletionPolicy)) { + //JAVA: throw new IllegalArgumentException("IndexWriter must be created with SnapshotDeletionPolicy"); + //JAVA: } + //JAVA: this.indexWriter = indexWriter; + //JAVA: this.taxoWriter = taxoWriter; + //JAVA: this.indexSDP = (SnapshotDeletionPolicy) delPolicy; + //JAVA: this.taxoSDP = taxoWriter.getDeletionPolicy(); + //JAVA: this.indexCommit = indexSDP.snapshot(); + //JAVA: this.taxoCommit = taxoSDP.snapshot(); + //JAVA: this.version = revisionVersion(indexCommit, taxoCommit); + //JAVA: this.sourceFiles = revisionFiles(indexCommit, taxoCommit); + //JAVA: } + #endregion + + this.indexSdp = indexWriter.Config.IndexDeletionPolicy as SnapshotDeletionPolicy; + if (indexSdp == null) + throw new ArgumentException("IndexWriter must be created with SnapshotDeletionPolicy", "indexWriter"); + + this.indexWriter = indexWriter; + this.taxonomyWriter = taxonomyWriter; + this.taxonomySdp = taxonomyWriter.DeletionPolicy; + this.indexCommit = indexSdp.Snapshot(); + this.taxonomyCommit = taxonomySdp.Snapshot(); + this.Version = RevisionVersion(indexCommit, taxonomyCommit); + this.SourceFiles = RevisionFiles(indexCommit, taxonomyCommit); + } + + public int CompareTo(string version) + { + #region Java + //JAVA: public int compareTo(String version) { + //JAVA: final String[] parts = version.split(":"); + //JAVA: final long indexGen = Long.parseLong(parts[0], RADIX); + //JAVA: final long taxoGen = Long.parseLong(parts[1], RADIX); + //JAVA: final long indexCommitGen = indexCommit.getGeneration(); + //JAVA: final long taxoCommitGen = taxoCommit.getGeneration(); + //JAVA: + //JAVA: // if the index generation is not the same as this commit's generation, + //JAVA: // compare by it. Otherwise, compare by the taxonomy generation. + //JAVA: if (indexCommitGen < indexGen) { + //JAVA: return -1; + //JAVA: } else if (indexCommitGen > indexGen) { + //JAVA: return 1; + //JAVA: } else { + //JAVA: return taxoCommitGen < taxoGen ? -1 : (taxoCommitGen > taxoGen ? 1 : 0); + //JAVA: } + //JAVA: } + #endregion + + string[] parts = version.Split(':'); + long indexGen = long.Parse(parts[0], NumberStyles.HexNumber); + long taxonomyGen = long.Parse(parts[1], NumberStyles.HexNumber); + long indexCommitGen = indexCommit.Generation; + long taxonomyCommitGen = taxonomyCommit.Generation; + + //TODO: long.CompareTo(); but which goes where. + if (indexCommitGen < indexGen) + return -1; + + if (indexCommitGen > indexGen) + return 1; + + return taxonomyCommitGen < taxonomyGen ? -1 : (taxonomyCommitGen > taxonomyGen ? 1 : 0); + } + + public int CompareTo(IRevision other) + { + #region Java + //JAVA: public int compareTo(Revision o) { + //JAVA: IndexAndTaxonomyRevision other = (IndexAndTaxonomyRevision) o; + //JAVA: int cmp = indexCommit.compareTo(other.indexCommit); + //JAVA: return cmp != 0 ? cmp : taxoCommit.compareTo(other.taxoCommit); + //JAVA: } + #endregion + + //TODO: This breaks the contract and will fail if called with a different implementation + // This is a flaw inherited from the original source... + // It should at least provide a better description to the InvalidCastException + IndexAndTaxonomyRevision or = (IndexAndTaxonomyRevision)other; + int cmp = indexCommit.CompareTo(or.indexCommit); + return cmp != 0 ? cmp : taxonomyCommit.CompareTo(or.taxonomyCommit); + } + + /// <summary> + /// + /// </summary> + /// <param name="source"></param> + /// <param name="fileName"></param> + /// <returns></returns> + /// <exception cref="IOException"></exception> + public Stream Open(string source, string fileName) + { + #region Java + //JAVA: public InputStream open(String source, String fileName) throws IOException { + //JAVA: assert source.equals(INDEX_SOURCE) || source.equals(TAXONOMY_SOURCE) : "invalid source; expected=(" + INDEX_SOURCE + //JAVA: + " or " + TAXONOMY_SOURCE + ") got=" + source; + //JAVA: IndexCommit ic = source.equals(INDEX_SOURCE) ? indexCommit : taxoCommit; + //JAVA: return new IndexInputStream(ic.getDirectory().openInput(fileName, IOContext.READONCE)); + //JAVA: } + #endregion + + Debug.Assert(source.Equals(INDEX_SOURCE) || source.Equals(TAXONOMY_SOURCE), + string.Format("invalid source; expected=({0} or {1}) got={2}", INDEX_SOURCE, TAXONOMY_SOURCE, source)); + IndexCommit commit = source.Equals(INDEX_SOURCE) ? indexCommit : taxonomyCommit; + return new IndexInputStream(commit.Directory.OpenInput(fileName, IOContext.READ_ONCE)); + } + + /// <summary> + /// + /// </summary> + /// <exception cref="IOException"></exception> + public void Release() + { + #region Java + //JAVA: public void release() throws IOException { + //JAVA: try { + //JAVA: indexSDP.release(indexCommit); + //JAVA: } finally { + //JAVA: taxoSDP.release(taxoCommit); + //JAVA: } + //JAVA: + //JAVA: try { + //JAVA: indexWriter.deleteUnusedFiles(); + //JAVA: } finally { + //JAVA: taxoWriter.getIndexWriter().deleteUnusedFiles(); + //JAVA: } + //JAVA: } + #endregion + + try + { + indexSdp.Release(indexCommit); + } + finally + { + taxonomySdp.Release(taxonomyCommit); + } + + try + { + indexWriter.DeleteUnusedFiles(); + } + finally + { + taxonomyWriter.IndexWriter.DeleteUnusedFiles(); + } + } + + //.NET NOTE: Changed doc comment as the JAVA one seems to be a bit too much copy/paste + /// <summary> + /// Returns a map of the revision files from the given <see cref="IndexCommit"/>s of the search and taxonomy indexes. + /// </summary> + /// <param name="indexCommit"></param> + /// <param name="taxonomyCommit"></param> + /// <returns></returns> + /// <exception cref="IOException"></exception> + public static IDictionary<string, IList<RevisionFile>> RevisionFiles(IndexCommit indexCommit, IndexCommit taxonomyCommit) + { + #region Java + //JAVA: /** Returns a singleton map of the revision files from the given {@link IndexCommit}. */ + //JAVA: public static Map<String, List<RevisionFile>> revisionFiles(IndexCommit indexCommit, IndexCommit taxoCommit) + //JAVA: throws IOException { + //JAVA: HashMap<String,List<RevisionFile>> files = new HashMap<>(); + //JAVA: files.put(INDEX_SOURCE, IndexRevision.revisionFiles(indexCommit).values().iterator().next()); + //JAVA: files.put(TAXONOMY_SOURCE, IndexRevision.revisionFiles(taxoCommit).values().iterator().next()); + //JAVA: return files; + //JAVA: } + #endregion + + return new Dictionary<string, IList<RevisionFile>>{ + { INDEX_SOURCE, IndexRevision.RevisionFiles(indexCommit).Values.First() }, + { TAXONOMY_SOURCE, IndexRevision.RevisionFiles(taxonomyCommit).Values.First() } + }; + } + + /// <summary> + /// Returns a String representation of a revision's version from the given + /// <see cref="IndexCommit"/>s of the search and taxonomy indexes. + /// </summary> + /// <param name="commit"></param> + /// <returns>a String representation of a revision's version from the given <see cref="IndexCommit"/>s of the search and taxonomy indexes.</returns> + public static string RevisionVersion(IndexCommit indexCommit, IndexCommit taxonomyCommit) + { + #region Java + //JAVA: public static String revisionVersion(IndexCommit indexCommit, IndexCommit taxoCommit) { + //JAVA: return Long.toString(indexCommit.getGeneration(), RADIX) + ":" + Long.toString(taxoCommit.getGeneration(), RADIX); + //JAVA: } + #endregion + + return string.Format("{0:X}:{1:X}", indexCommit.Generation, taxonomyCommit.Generation); + } + + /// <summary> + /// A <seealso cref="DirectoryTaxonomyWriter"/> which sets the underlying + /// <seealso cref="IndexWriter"/>'s <seealso cref="IndexDeletionPolicy"/> to + /// <seealso cref="SnapshotDeletionPolicy"/>. + /// </summary> + public class SnapshotDirectoryTaxonomyWriter : DirectoryTaxonomyWriter + { + /// <summary> + /// Gets the <see cref="SnapshotDeletionPolicy"/> used by the underlying <see cref="IndexWriter"/>. + /// </summary> + public SnapshotDeletionPolicy DeletionPolicy { get; private set; } + /// <summary> + /// Gets the <see cref="IndexWriter"/> used by this <see cref="DirectoryTaxonomyWriter"/>. + /// </summary> + public IndexWriter IndexWriter { get; private set; } + + /// <summary> + /// <see cref="DirectoryTaxonomyWriter(Directory, OpenMode, ITaxonomyWriterCache)"/> + /// </summary> + /// <exception cref="IOException"></exception> + public SnapshotDirectoryTaxonomyWriter(Directory directory, OpenMode openMode, ITaxonomyWriterCache cache) + : base(directory, openMode, cache) + { + } + + /// <summary> + /// <see cref="DirectoryTaxonomyWriter(Directory, OpenMode)"/> + /// </summary> + /// <exception cref="IOException"></exception> + public SnapshotDirectoryTaxonomyWriter(Directory directory, OpenMode openMode = OpenMode.CREATE_OR_APPEND) + : base(directory, openMode) + { + } + + /// <summary> + /// + /// </summary> + /// <param name="openMode"></param> + /// <returns></returns> + protected override IndexWriterConfig CreateIndexWriterConfig(OpenMode openMode) + { + IndexWriterConfig conf = base.CreateIndexWriterConfig(openMode); + conf.IndexDeletionPolicy = DeletionPolicy = new SnapshotDeletionPolicy(conf.IndexDeletionPolicy); + return conf; + } + + /// <summary> + /// + /// </summary> + /// <param name="directory"></param> + /// <param name="config"></param> + /// <returns></returns> + protected override IndexWriter OpenIndexWriter(Directory directory, IndexWriterConfig config) + { + return IndexWriter = base.OpenIndexWriter(directory, config); + } + } + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/lucenenet/blob/6da4dd20/src/Lucene.Net.Replicator/IndexInputInputStream.cs ---------------------------------------------------------------------- diff --git a/src/Lucene.Net.Replicator/IndexInputInputStream.cs b/src/Lucene.Net.Replicator/IndexInputInputStream.cs new file mode 100644 index 0000000..95f6e1c --- /dev/null +++ b/src/Lucene.Net.Replicator/IndexInputInputStream.cs @@ -0,0 +1,102 @@ +//STATUS: INPROGRESS - 4.8.0 + +using System; +using System.IO; +using Lucene.Net.Store; + +namespace Lucene.Net.Replicator +{ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + /// <summary> + /// + /// </summary> + /// <remarks> + /// Lucene.Experimental + /// </remarks> + public class IndexInputStream : Stream + { + private readonly IndexInput input; + private long remaining; + + public IndexInputStream(IndexInput input) + { + this.input = input; + remaining = input.Length; + } + + public override void Flush() + { + throw new InvalidOperationException("Cannot flush a readonly stream."); + } + + public override long Seek(long offset, SeekOrigin origin) + { + switch (origin) + { + case SeekOrigin.Begin: + Position = offset; + break; + case SeekOrigin.Current: + Position += offset; + break; + case SeekOrigin.End: + Position = Length - offset; + break; + } + return Position; + } + + public override void SetLength(long value) + { + throw new InvalidOperationException("Cannot change length of a readonly stream."); + } + + public override int Read(byte[] buffer, int offset, int count) + { + int remaining = (int) (input.Length - input.GetFilePointer()); + input.ReadBytes(buffer, offset, Math.Min(remaining, count)); + return remaining; + } + + public override void Write(byte[] buffer, int offset, int count) + { + throw new InvalidCastException("Cannot write to a readonly stream."); + } + + public override bool CanRead { get { return true; } } + public override bool CanSeek { get { return true; } } + public override bool CanWrite { get { return false; } } + public override long Length { get { return input.Length; } } + + public override long Position + { + get { return input.GetFilePointer(); } + set { input.Seek(value); } + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + input.Dispose(); + } + base.Dispose(disposing); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/lucenenet/blob/6da4dd20/src/Lucene.Net.Replicator/IndexReplicationHandler.cs ---------------------------------------------------------------------- diff --git a/src/Lucene.Net.Replicator/IndexReplicationHandler.cs b/src/Lucene.Net.Replicator/IndexReplicationHandler.cs new file mode 100644 index 0000000..474361a --- /dev/null +++ b/src/Lucene.Net.Replicator/IndexReplicationHandler.cs @@ -0,0 +1,510 @@ +//STATUS: DRAFT - 4.8.0 + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text.RegularExpressions; +using Lucene.Net.Index; +using Lucene.Net.Store; +using Lucene.Net.Support; +using Lucene.Net.Util; +using Directory = Lucene.Net.Store.Directory; + +namespace Lucene.Net.Replicator +{ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + /// <summary> + /// A <see cref="IReplicationHandler"/> for replication of an index. Implements + /// <see cref="RevisionReady"/> by copying the files pointed by the client resolver to + /// the index <see cref="Store.Directory"/> and then touches the index with + /// <see cref="IndexWriter"/> to make sure any unused files are deleted. + /// </summary> + /// <remarks> + /// <para> + /// This handler assumes that <see cref="IndexWriter"/> is not opened by + /// another process on the index directory. In fact, opening an + /// <see cref="IndexWriter"/> on the same directory to which files are copied can lead + /// to undefined behavior, where some or all the files will be deleted, override + /// other files or simply create a mess. When you replicate an index, it is best + /// if the index is never modified by <see cref="IndexWriter"/>, except the one that is + /// open on the source index, from which you replicate. + /// </para> + /// <para> + /// This handler notifies the application via a provided <see cref="Callable"/> when an + /// updated index commit was made available for it. + /// </para> + /// + /// Lucene.Experimental + /// </remarks> + public class IndexReplicationHandler : IReplicationHandler + { + /// <summary> + /// The component used to log messages to the {@link InfoStream#getDefault() + /// default} <seealso cref="InfoStream"/>. + /// </summary> + public const string INFO_STREAM_COMPONENT = "IndexReplicationHandler"; + + private readonly Directory indexDirectory; + private readonly Func<bool?> callback; + private InfoStream infoStream; + + public string CurrentVersion { get; private set; } + public IDictionary<string, IList<RevisionFile>> CurrentRevisionFiles { get; private set; } + + public InfoStream InfoStream + { + get { return infoStream; } + set { infoStream = value ?? InfoStream.NO_OUTPUT; } + } + + /// <summary> + /// Constructor with the given index directory and callback to notify when the + /// indexes were updated. + /// </summary> + /// <param name="indexDirectory"></param> + /// <param name="callback"></param> + // .NET NOTE: Java uses a Callable<Boolean>, however it is never uses the returned value? + public IndexReplicationHandler(Directory indexDirectory, Func<bool?> callback) + { + #region JAVA + //JAVA: this.callback = callback; + //JAVA: this.indexDir = indexDir; + //JAVA: currentRevisionFiles = null; + //JAVA: currentVersion = null; + //JAVA: if (DirectoryReader.indexExists(indexDir)) + //JAVA: { + //JAVA: final List<IndexCommit> commits = DirectoryReader.listCommits(indexDir); + //JAVA: final IndexCommit commit = commits.get(commits.size() - 1); + //JAVA: currentRevisionFiles = IndexRevision.revisionFiles(commit); + //JAVA: currentVersion = IndexRevision.revisionVersion(commit); + //JAVA: final InfoStream infoStream = InfoStream.getDefault(); + //JAVA: if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) + //JAVA: { + //JAVA: infoStream.message(INFO_STREAM_COMPONENT, "constructor(): currentVersion=" + currentVersion + //JAVA: + " currentRevisionFiles=" + currentRevisionFiles); + //JAVA: infoStream.message(INFO_STREAM_COMPONENT, "constructor(): commit=" + commit); + //JAVA: } + //JAVA: } + #endregion + + this.InfoStream = InfoStream.Default; + this.callback = callback; + this.indexDirectory = indexDirectory; + + CurrentVersion = null; + CurrentRevisionFiles = null; + + if (DirectoryReader.IndexExists(indexDirectory)) + { + IList<IndexCommit> commits = DirectoryReader.ListCommits(indexDirectory); + IndexCommit commit = commits.Last(); + + CurrentVersion = IndexRevision.RevisionVersion(commit); + CurrentRevisionFiles = IndexRevision.RevisionFiles(commit); + + WriteToInfoStream( + string.Format("constructor(): currentVersion={0} currentRevisionFiles={1}", CurrentVersion, CurrentRevisionFiles), + string.Format("constructor(): commit={0}", commit)); + } + } + + public void RevisionReady(string version, + IDictionary<string, IList<RevisionFile>> revisionFiles, + IDictionary<string, IList<string>> copiedFiles, + IDictionary<string, Directory> sourceDirectory) + { + #region Java + //JAVA: if (revisionFiles.size() > 1) { + //JAVA: throw new IllegalArgumentException("this handler handles only a single source; got " + revisionFiles.keySet()); + //JAVA: } + //JAVA: + //JAVA: Directory clientDir = sourceDirectory.values().iterator().next(); + //JAVA: List<String> files = copiedFiles.values().iterator().next(); + //JAVA: String segmentsFile = getSegmentsFile(files, false); + //JAVA: + //JAVA: boolean success = false; + //JAVA: try { + //JAVA: // copy files from the client to index directory + //JAVA: copyFiles(clientDir, indexDir, files); + //JAVA: + //JAVA: // fsync all copied files (except segmentsFile) + //JAVA: indexDir.sync(files); + //JAVA: + //JAVA: // now copy and fsync segmentsFile + //JAVA: clientDir.copy(indexDir, segmentsFile, segmentsFile, IOContext.READONCE); + //JAVA: indexDir.sync(Collections.singletonList(segmentsFile)); + //JAVA: + //JAVA: success = true; + //JAVA: } finally { + //JAVA: if (!success) { + //JAVA: files.add(segmentsFile); // add it back so it gets deleted too + //JAVA: cleanupFilesOnFailure(indexDir, files); + //JAVA: } + //JAVA: } + //JAVA: + //JAVA: // all files have been successfully copied + sync'd. update the handler's state + //JAVA: currentRevisionFiles = revisionFiles; + //JAVA: currentVersion = version; + //JAVA: + //JAVA: if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) { + //JAVA: infoStream.message(INFO_STREAM_COMPONENT, "revisionReady(): currentVersion=" + currentVersion + //JAVA: + " currentRevisionFiles=" + currentRevisionFiles); + //JAVA: } + //JAVA: + //JAVA: // update the segments.gen file + //JAVA: writeSegmentsGen(segmentsFile, indexDir); + //JAVA: + //JAVA: // Cleanup the index directory from old and unused index files. + //JAVA: // NOTE: we don't use IndexWriter.deleteUnusedFiles here since it may have + //JAVA: // side-effects, e.g. if it hits sudden IO errors while opening the index + //JAVA: // (and can end up deleting the entire index). It is not our job to protect + //JAVA: // against those errors, app will probably hit them elsewhere. + //JAVA: cleanupOldIndexFiles(indexDir, segmentsFile); + //JAVA: + //JAVA: // successfully updated the index, notify the callback that the index is + //JAVA: // ready. + //JAVA: if (callback != null) { + //JAVA: try { + //JAVA: callback.call(); + //JAVA: } catch (Exception e) { + //JAVA: throw new IOException(e); + //JAVA: } + //JAVA: } + #endregion + //TODO: ArgumentOutOfRangeException more suited? + if (revisionFiles.Count > 1) throw new ArgumentException(string.Format("this handler handles only a single source; got {0}", revisionFiles.Keys)); + + Directory clientDirectory = sourceDirectory.Values.First(); + IList<string> files = copiedFiles.Values.First(); + string segmentsFile = GetSegmentsFile(files, false); + + bool success = false; + try + { + // copy files from the client to index directory + CopyFiles(clientDirectory, indexDirectory, files); + + // fsync all copied files (except segmentsFile) + indexDirectory.Sync(files); + + // now copy and fsync segmentsFile + clientDirectory.Copy(indexDirectory, segmentsFile, segmentsFile, IOContext.READ_ONCE); + indexDirectory.Sync(new[] { segmentsFile }); + + success = true; + } + finally + { + if (!success) + { + files.Add(segmentsFile); // add it back so it gets deleted too + CleanupFilesOnFailure(indexDirectory, files); + } + } + + // all files have been successfully copied + sync'd. update the handler's state + CurrentRevisionFiles = revisionFiles; + CurrentVersion = version; + + WriteToInfoStream(string.Format("revisionReady(): currentVersion={0} currentRevisionFiles={1}", CurrentVersion, CurrentRevisionFiles)); + + // update the segments.gen file + WriteSegmentsGen(segmentsFile, indexDirectory); + + // Cleanup the index directory from old and unused index files. + // NOTE: we don't use IndexWriter.deleteUnusedFiles here since it may have + // side-effects, e.g. if it hits sudden IO errors while opening the index + // (and can end up deleting the entire index). It is not our job to protect + // against those errors, app will probably hit them elsewhere. + CleanupOldIndexFiles(indexDirectory, segmentsFile); + + // successfully updated the index, notify the callback that the index is + // ready. + if (callback != null) { + try { + callback.Invoke(); + } catch (Exception e) { + throw new IOException(e.Message, e); + } + } + } + + // .NET NOTE: Utility Method + private void WriteToInfoStream(params string[] messages) + { + if (!InfoStream.IsEnabled(INFO_STREAM_COMPONENT)) + return; + + foreach (string message in messages) + InfoStream.Message(INFO_STREAM_COMPONENT, message); + } + + /// <summary> + /// Returns the last <see cref="IndexCommit"/> found in the <see cref="Directory"/>, or + /// <code>null</code> if there are no commits. + /// </summary> + /// <param name="directory"></param> + /// <returns></returns> + /// <exception cref="System.IO.IOException"></exception> + public static IndexCommit GetLastCommit(Directory directory) + { + #region Java + //JAVA: try { + //JAVA: if (DirectoryReader.indexExists(dir)) { + //JAVA: List<IndexCommit> commits = DirectoryReader.listCommits(dir); + //JAVA: // listCommits guarantees that we get at least one commit back, or + //JAVA: // IndexNotFoundException which we handle below + //JAVA: return commits.get(commits.size() - 1); + //JAVA: } + //JAVA: } catch (IndexNotFoundException e) { + //JAVA: // ignore the exception and return null + //JAVA: } + //JAVA: return null; + #endregion + + try + { + // IndexNotFoundException which we handle below + return DirectoryReader.IndexExists(directory) + ? DirectoryReader.ListCommits(directory).Last() + : null; + } + catch (IndexNotFoundException) + { + // ignore the exception and return null + } + return null; + } + + /// <summary> + /// Verifies that the last file is segments_N and fails otherwise. It also + /// removes and returns the file from the list, because it needs to be handled + /// last, after all files. This is important in order to guarantee that if a + /// reader sees the new segments_N, all other segment files are already on + /// stable storage. + /// <para> + /// The reason why the code fails instead of putting segments_N file last is + /// that this indicates an error in the Revision implementation. + /// </para> + /// </summary> + /// <param name="files"></param> + /// <param name="allowEmpty"></param> + /// <returns></returns> + public static string GetSegmentsFile(IList<string> files, bool allowEmpty) + { + #region Java + //JAVA: if (files.isEmpty()) { + //JAVA: if (allowEmpty) { + //JAVA: return null; + //JAVA: } else { + //JAVA: throw new IllegalStateException("empty list of files not allowed"); + //JAVA: } + //JAVA: } + //JAVA: + //JAVA: String segmentsFile = files.remove(files.size() - 1); + //JAVA: if (!segmentsFile.startsWith(IndexFileNames.SEGMENTS) || segmentsFile.equals(IndexFileNames.SEGMENTS_GEN)) { + //JAVA: throw new IllegalStateException("last file to copy+sync must be segments_N but got " + segmentsFile + //JAVA: + "; check your Revision implementation!"); + //JAVA: } + //JAVA: return segmentsFile; + #endregion + + if (!files.Any()) + { + if (allowEmpty) + return null; + throw new InvalidOperationException("empty list of files not allowed"); + } + + string segmentsFile = files.Last(); + //NOTE: Relying on side-effects outside? + files.RemoveAt(files.Count - 1); + if (!segmentsFile.StartsWith(IndexFileNames.SEGMENTS) || segmentsFile.Equals(IndexFileNames.SEGMENTS_GEN)) + { + throw new InvalidOperationException( + string.Format("last file to copy+sync must be segments_N but got {0}; check your Revision implementation!", segmentsFile)); + } + return segmentsFile; + } + + /// <summary> + /// Cleanup the index directory by deleting all given files. Called when file + /// copy or sync failed. + /// </summary> + /// <param name="directory"></param> + /// <param name="files"></param> + public static void CleanupFilesOnFailure(Directory directory, IList<string> files) + { + #region Java + //JAVA: for (String file : files) { + //JAVA: try { + //JAVA: dir.deleteFile(file); + //JAVA: } catch (Throwable t) { + //JAVA: // suppress any exception because if we're here, it means copy + //JAVA: // failed, and we must cleanup after ourselves. + //JAVA: } + //JAVA: } + #endregion + + foreach (string file in files) + { + try + { + directory.DeleteFile(file); + } + catch + { + // suppress any exception because if we're here, it means copy + // failed, and we must cleanup after ourselves. + } + } + } + + public static void CleanupOldIndexFiles(Directory directory, string segmentsFile) + { + #region Java + //JAVA: try { + //JAVA: IndexCommit commit = getLastCommit(dir); + //JAVA: // commit == null means weird IO errors occurred, ignore them + //JAVA: // if there were any IO errors reading the expected commit point (i.e. + //JAVA: // segments files mismatch), then ignore that commit either. + //JAVA: if (commit != null && commit.getSegmentsFileName().equals(segmentsFile)) { + //JAVA: Set<String> commitFiles = new HashSet<>(); + //JAVA: commitFiles.addAll(commit.getFileNames()); + //JAVA: commitFiles.add(IndexFileNames.SEGMENTS_GEN); + //JAVA: Matcher matcher = IndexFileNames.CODEC_FILE_PATTERN.matcher(""); + //JAVA: for (String file : dir.listAll()) { + //JAVA: if (!commitFiles.contains(file) + //JAVA: && (matcher.reset(file).matches() || file.startsWith(IndexFileNames.SEGMENTS))) { + //JAVA: try { + //JAVA: dir.deleteFile(file); + //JAVA: } catch (Throwable t) { + //JAVA: // suppress, it's just a best effort + //JAVA: } + //JAVA: } + //JAVA: } + //JAVA: } + //JAVA: } catch (Throwable t) { + //JAVA: // ignore any errors that happens during this state and only log it. this + //JAVA: // cleanup will have a chance to succeed the next time we get a new + //JAVA: // revision. + //JAVA: } + #endregion + + try + { + IndexCommit commit = GetLastCommit(directory); + // commit == null means weird IO errors occurred, ignore them + // if there were any IO errors reading the expected commit point (i.e. + // segments files mismatch), then ignore that commit either. + + if (commit != null && commit.SegmentsFileName.Equals(segmentsFile)) + { + HashSet<string> commitFiles = new HashSet<string>( commit.FileNames + .Union(new[] {IndexFileNames.SEGMENTS_GEN})); + + Regex matcher = IndexFileNames.CODEC_FILE_PATTERN; + foreach (string file in directory.ListAll() + .Where(file => !commitFiles.Contains(file) && (matcher.IsMatch(file) || file.StartsWith(IndexFileNames.SEGMENTS)))) + { + try + { + directory.DeleteFile(file); + } + catch + { + // suppress, it's just a best effort + } + } + + } + } + catch + { + // ignore any errors that happens during this state and only log it. this + // cleanup will have a chance to succeed the next time we get a new + // revision. + } + } + + /// <summary> + /// + /// </summary> + /// <param name="source"></param> + /// <param name="target"></param> + /// <param name="files"></param> + /// <exception cref="System.IO.IOException"></exception> + public static void CopyFiles(Directory source, Directory target, IList<string> files) + { + #region Java + //JAVA: if (!source.equals(target)) { + //JAVA: for (String file : files) { + //JAVA: source.copy(target, file, file, IOContext.READONCE); + //JAVA: } + //JAVA: } + #endregion + + if (source.Equals(target)) + return; + + foreach (string file in files) + source.Copy(target, file, file, IOContext.READ_ONCE); + } + + /// <summary> + /// Writes <see cref="IndexFileNames.SEGMENTS_GEN"/> file to the directory, reading + /// the generation from the given <code>segmentsFile</code>. If it is <code>null</code>, + /// this method deletes segments.gen from the directory. + /// </summary> + /// <param name="segmentsFile"></param> + /// <param name="directory"></param> + public static void WriteSegmentsGen(string segmentsFile, Directory directory) + { + #region Java + //JAVA: public static void writeSegmentsGen(String segmentsFile, Directory dir) { + //JAVA: if (segmentsFile != null) { + //JAVA: SegmentInfos.writeSegmentsGen(dir, SegmentInfos.generationFromSegmentsFileName(segmentsFile)); + //JAVA: } else { + //JAVA: try { + //JAVA: dir.deleteFile(IndexFileNames.SEGMENTS_GEN); + //JAVA: } catch (Throwable t) { + //JAVA: // suppress any errors while deleting this file. + //JAVA: } + //JAVA: } + //JAVA: } + #endregion + + if (segmentsFile != null) + { + SegmentInfos.WriteSegmentsGen(directory, SegmentInfos.GenerationFromSegmentsFileName(segmentsFile)); + return; + } + + try + { + directory.DeleteFile(IndexFileNames.SEGMENTS_GEN); + } + catch + { + // suppress any errors while deleting this file. + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/lucenenet/blob/6da4dd20/src/Lucene.Net.Replicator/IndexRevision.cs ---------------------------------------------------------------------- diff --git a/src/Lucene.Net.Replicator/IndexRevision.cs b/src/Lucene.Net.Replicator/IndexRevision.cs new file mode 100644 index 0000000..930b120 --- /dev/null +++ b/src/Lucene.Net.Replicator/IndexRevision.cs @@ -0,0 +1,200 @@ +//STATUS: DRAFT - 4.8.0 + +using System; +using System.IO; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Diagnostics; +using System.Globalization; +using System.Linq; +using Lucene.Net.Index; +using Lucene.Net.Store; +using Directory = Lucene.Net.Store.Directory; + +namespace Lucene.Net.Replicator +{ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + /// <summary> + /// A <see cref="IRevision"/> of a single index files which comprises the list of files + /// that are part of the current <see cref="IndexCommit"/>. To ensure the files are not + /// deleted by <see cref="IndexWriter"/> for as long as this revision stays alive (i.e. + /// until <see cref="Release"/>, the current commit point is snapshotted, using + /// <see cref="SnapshotDeletionPolicy"/> (this means that the given writer's + /// <see cref="IndexWriterConfig.IndexDeletionPolicy"/> should return + /// <see cref="SnapshotDeletionPolicy"/>). + /// <p> + /// When this revision is <see cref="Release"/>d, it releases the obtained + /// snapshot as well as calls <see cref="IndexWriter.DeleteUnusedFiles"/> so that the + /// snapshotted files are deleted (if they are no longer needed). + /// </p> + /// </summary> + /// <remarks> + /// Lucene.Experimental + /// </remarks> + public class IndexRevision : IRevision + { + #region Java + //JAVA: private static final int RADIX = 16; + //JAVA: private static final String SOURCE = "index"; + //JAVA: private final IndexWriter writer; + //JAVA: private final IndexCommit commit; + //JAVA: private final SnapshotDeletionPolicy sdp; + //JAVA: private final String version; + //JAVA: private final Map<String, List<RevisionFile>> sourceFiles; + #endregion + + private const string SOURCE = "index"; + + private readonly IndexWriter writer; + private readonly IndexCommit commit; + private readonly SnapshotDeletionPolicy sdp; + + public string Version { get; private set; } + public IDictionary<string, IList<RevisionFile>> SourceFiles { get; private set; } + + public IndexRevision(IndexWriter writer) + { + #region Java + //JAVA: public IndexRevision(IndexWriter writer) throws IOException { + //JAVA: IndexDeletionPolicy delPolicy = writer.getConfig().getIndexDeletionPolicy(); + //JAVA: if (!(delPolicy instanceof SnapshotDeletionPolicy)) { + //JAVA: throw new IllegalArgumentException("IndexWriter must be created with SnapshotDeletionPolicy"); + //JAVA: } + //JAVA: this.writer = writer; + //JAVA: this.sdp = (SnapshotDeletionPolicy) delPolicy; + //JAVA: this.commit = sdp.snapshot(); + //JAVA: this.version = revisionVersion(commit); + //JAVA: this.sourceFiles = revisionFiles(commit); + //JAVA: } + #endregion + + sdp = writer.Config.IndexDeletionPolicy as SnapshotDeletionPolicy; + if (sdp == null) + throw new ArgumentException("IndexWriter must be created with SnapshotDeletionPolicy", "writer"); + + this.writer = writer; + this.commit = sdp.Snapshot(); + this.Version = RevisionVersion(commit); + this.SourceFiles = RevisionFiles(commit); + } + + public int CompareTo(string version) + { + #region Java + //JAVA: long gen = Long.parseLong(version, RADIX); + //JAVA: long commitGen = commit.getGeneration(); + //JAVA: return commitGen < gen ? -1 : (commitGen > gen ? 1 : 0); + #endregion + long gen = long.Parse(version, NumberStyles.HexNumber); + long commitGen = commit.Generation; + //TODO: long.CompareTo(); but which goes where. + return commitGen < gen ? -1 : (commitGen > gen ? 1 : 0); + } + + public int CompareTo(IRevision other) + { + #region Java + //JAVA: IndexRevision other = (IndexRevision)o; + //JAVA: return commit.compareTo(other.commit); + #endregion + //TODO: This breaks the contract and will fail if called with a different implementation + // This is a flaw inherited from the original source... + // It should at least provide a better description to the InvalidCastException + IndexRevision or = (IndexRevision)other; + return commit.CompareTo(or.commit); + } + + public Stream Open(string source, string fileName) + { + Debug.Assert(source.Equals(SOURCE), string.Format("invalid source; expected={0} got={1}", SOURCE, source)); + return new IndexInputStream(commit.Directory.OpenInput(fileName, IOContext.READ_ONCE)); + } + + public void Release() + { + sdp.Release(commit); + writer.DeleteUnusedFiles(); + } + + public override string ToString() + { + return "IndexRevision version=" + Version + " files=" + SourceFiles; + } + + // returns a RevisionFile with some metadata + private static RevisionFile CreateRevisionFile(string fileName, Directory directory) + { + #region Java + //JAVA: private static RevisionFile newRevisionFile(String file, Directory dir) throws IOException { + //JAVA: RevisionFile revFile = new RevisionFile(file); + //JAVA: revFile.size = dir.fileLength(file); + //JAVA: return revFile; + //JAVA: } + #endregion + return new RevisionFile(fileName, directory.FileLength(fileName)); + } + + /** Returns a singleton map of the revision files from the given {@link IndexCommit}. */ + public static IDictionary<string, IList<RevisionFile>> RevisionFiles(IndexCommit commit) + { + #region Java + //JAVA: public static Map<String,List<RevisionFile>> revisionFiles(IndexCommit commit) throws IOException { + //JAVA: Collection<String> commitFiles = commit.getFileNames(); + //JAVA: List<RevisionFile> revisionFiles = new ArrayList<>(commitFiles.size()); + //JAVA: String segmentsFile = commit.getSegmentsFileName(); + //JAVA: Directory dir = commit.getDirectory(); + //JAVA: + //JAVA: for (String file : commitFiles) { + //JAVA: if (!file.equals(segmentsFile)) { + //JAVA: revisionFiles.add(newRevisionFile(file, dir)); + //JAVA: } + //JAVA: } + //JAVA: revisionFiles.add(newRevisionFile(segmentsFile, dir)); // segments_N must be last + //JAVA: return Collections.singletonMap(SOURCE, revisionFiles); + //JAVA: } + #endregion + + List<RevisionFile> revisionFiles = commit.FileNames + .Where(file => !string.Equals(file, commit.SegmentsFileName)) + .Select(file => CreateRevisionFile(file, commit.Directory)) + //Note: segments_N must be last + .Union(new[] {CreateRevisionFile(commit.SegmentsFileName, commit.Directory)}) + .ToList(); + return new Dictionary<string, IList<RevisionFile>> + { + { SOURCE, revisionFiles } + }; + } + + /// <summary> + /// Returns a String representation of a revision's version from the given <see cref="IndexCommit"/> + /// </summary> + /// <param name="commit"></param> + /// <returns></returns> + public static string RevisionVersion(IndexCommit commit) + { + #region Java + //JAVA: public static String revisionVersion(IndexCommit commit) { + //JAVA: return Long.toString(commit.getGeneration(), RADIX); + //JAVA: } + #endregion + return commit.Generation.ToString("X"); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/lucenenet/blob/6da4dd20/src/Lucene.Net.Replicator/LocalReplicator.cs ---------------------------------------------------------------------- diff --git a/src/Lucene.Net.Replicator/LocalReplicator.cs b/src/Lucene.Net.Replicator/LocalReplicator.cs new file mode 100644 index 0000000..ae3a3a9 --- /dev/null +++ b/src/Lucene.Net.Replicator/LocalReplicator.cs @@ -0,0 +1,416 @@ +//STATUS: DRAFT - 4.8.0 + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Linq; +using Lucene.Net.Search; +using Lucene.Net.Support; + +namespace Lucene.Net.Replicator +{ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + /// <summary> + /// A <see cref="IReplicator"/> implementation for use by the side that publishes + /// <see cref="IRevision"/>s, as well for clients to <see cref="CheckForUpdate"/> + /// check for updates}. When a client needs to be updated, it is returned a + /// <see cref="SessionToken"/> through which it can + /// <see cref="ObtainFile"/> the files of that + /// revision. As long as a revision is being replicated, this replicator + /// guarantees that it will not be <seealso cref="IRevision.Release"/>. + /// <para> + /// Replication sessions expire by default after + /// <seealso cref="DEFAULT_SESSION_EXPIRATION_THRESHOLD"/>, and the threshold can be + /// configured through <seealso cref="ExpirationThreshold"/>. + /// </para> + /// </summary> + /// <remarks> + /// Lucene.Experimental + /// </remarks> + public class LocalReplicator : IReplicator + { + /// <summary>Threshold for expiring inactive sessions. Defaults to 30 minutes.</summary> + public const long DEFAULT_SESSION_EXPIRATION_THRESHOLD = 1000 * 60 * 30; + + private long expirationThreshold = DEFAULT_SESSION_EXPIRATION_THRESHOLD; + private readonly object padlock = new object(); + private volatile RefCountedRevision currentRevision; + private volatile bool disposed = false; + + private readonly AtomicInt32 sessionToken = new AtomicInt32(0); + private readonly Dictionary<string, ReplicationSession> sessions = new Dictionary<string, ReplicationSession>(); + + /// <summary> + /// Returns the expiration threshold. + /// </summary> + public long ExpirationThreshold + { + get { return expirationThreshold; } + set + { + lock (padlock) + { + EnsureOpen(); + expirationThreshold = value; + CheckExpiredSessions(); + } + } + } + + public void Publish(IRevision revision) + { + #region Java + //JAVA: public synchronized void publish(Revision revision) throws IOException { + //JAVA: ensureOpen(); + //JAVA: if (currentRevision != null) { + //JAVA: int compare = revision.compareTo(currentRevision.revision); + //JAVA: if (compare == 0) { + //JAVA: // same revision published again, ignore but release it + //JAVA: revision.release(); + //JAVA: return; + //JAVA: } + //JAVA: + //JAVA: if (compare < 0) { + //JAVA: revision.release(); + //JAVA: throw new IllegalArgumentException("Cannot publish an older revision: rev=" + revision + " current=" + //JAVA: + currentRevision); + //JAVA: } + //JAVA: } + //JAVA: + //JAVA: // swap revisions + //JAVA: final RefCountedRevision oldRevision = currentRevision; + //JAVA: currentRevision = new RefCountedRevision(revision); + //JAVA: if (oldRevision != null) { + //JAVA: oldRevision.decRef(); + //JAVA: } + //JAVA: + //JAVA: // check for expired sessions + //JAVA: checkExpiredSessions(); + //JAVA: } + #endregion + + lock (padlock) + { + EnsureOpen(); + + if (currentRevision != null) + { + int compare = revision.CompareTo(currentRevision.Revision); + if (compare == 0) + { + // same revision published again, ignore but release it + revision.Release(); + return; + } + + if (compare < 0) + { + revision.Release(); + throw new ArgumentException(string.Format("Cannot publish an older revision: rev={0} current={1}", revision, currentRevision), "revision"); + } + } + + RefCountedRevision oldRevision = currentRevision; + currentRevision = new RefCountedRevision(revision); + if(oldRevision != null) + oldRevision.DecRef(); + + CheckExpiredSessions(); + } + } + + /// <summary> + /// + /// </summary> + /// <param name="currentVersion"></param> + /// <returns></returns> + public SessionToken CheckForUpdate(string currentVersion) + { + #region Java + //JAVA: public synchronized SessionToken checkForUpdate(String currentVersion) { + //JAVA: ensureOpen(); + //JAVA: if (currentRevision == null) { // no published revisions yet + //JAVA: return null; + //JAVA: } + //JAVA: + //JAVA: if (currentVersion != null && currentRevision.revision.compareTo(currentVersion) <= 0) { + //JAVA: // currentVersion is newer or equal to latest published revision + //JAVA: return null; + //JAVA: } + //JAVA: + //JAVA: // currentVersion is either null or older than latest published revision + //JAVA: currentRevision.incRef(); + //JAVA: final String sessionID = Integer.toString(sessionToken.incrementAndGet()); + //JAVA: final SessionToken sessionToken = new SessionToken(sessionID, currentRevision.revision); + //JAVA: final ReplicationSession timedSessionToken = new ReplicationSession(sessionToken, currentRevision); + //JAVA: sessions.put(sessionID, timedSessionToken); + //JAVA: return sessionToken; + //JAVA: } + #endregion + + lock (padlock) + { + EnsureOpen(); + if (currentRevision == null) + return null; // no published revisions yet + + if (currentVersion != null && currentRevision.Revision.CompareTo(currentVersion) <= 0) + return null; // currentVersion is newer or equal to latest published revision + + // currentVersion is either null or older than latest published revision + currentRevision.IncRef(); + + string sessionID = sessionToken.IncrementAndGet().ToString(); + SessionToken token = new SessionToken(sessionID, currentRevision.Revision); + sessions[sessionID] = new ReplicationSession(token, currentRevision); + return token; + } + + } + + /// <summary> + /// + /// </summary> + /// <param name="sessionId"></param> + /// <exception cref="InvalidOperationException"></exception> + public void Release(string sessionId) + { + lock (padlock) + { + EnsureOpen(); + ReleaseSession(sessionId); + } + } + + public Stream ObtainFile(string sessionId, string source, string fileName) + { + #region Java + //JAVA: public synchronized InputStream obtainFile(String sessionID, String source, String fileName) throws IOException { + //JAVA: ensureOpen(); + //JAVA: ReplicationSession session = sessions.get(sessionID); + //JAVA: if (session != null && session.isExpired(expirationThresholdMilllis)) { + //JAVA: releaseSession(sessionID); + //JAVA: session = null; + //JAVA: } + //JAVA: // session either previously expired, or we just expired it + //JAVA: if (session == null) { + //JAVA: throw new SessionExpiredException("session (" + sessionID + ") expired while obtaining file: source=" + source + //JAVA: + " file=" + fileName); + //JAVA: } + //JAVA: sessions.get(sessionID).markAccessed(); + //JAVA: return session.revision.revision.open(source, fileName); + //JAVA: } + #endregion + + lock (padlock) + { + EnsureOpen(); + + ReplicationSession session = sessions[sessionId]; + if (session != null && session.IsExpired(ExpirationThreshold)) + { + ReleaseSession(sessionId); + session = null; + } + // session either previously expired, or we just expired it + if (session == null) + { + throw new SessionExpiredException(string.Format("session ({0}) expired while obtaining file: source={1} file={2}", sessionId, source, fileName)); + } + sessions[sessionId].MarkAccessed(); + return session.Revision.Revision.Open(source, fileName); + } + + } + + public void Dispose() + { + #region Java + //JAVA: public synchronized void close() throws IOException { + //JAVA: if (!closed) { + //JAVA: // release all managed revisions + //JAVA: for (ReplicationSession session : sessions.values()) { + //JAVA: session.revision.decRef(); + //JAVA: } + //JAVA: sessions.clear(); + //JAVA: closed = true; + //JAVA: } + //JAVA: } + #endregion + + if (disposed) + return; + + lock (padlock) + { + foreach (ReplicationSession session in sessions.Values) + session.Revision.DecRef(); + sessions.Clear(); + } + disposed = true; + } + + /// <exception cref="InvalidOperationException"></exception> + private void CheckExpiredSessions() + { + #region Java + //JAVA: private void checkExpiredSessions() throws IOException { + //JAVA: // make a "to-delete" list so we don't risk deleting from the map while iterating it + //JAVA: final ArrayList<ReplicationSession> toExpire = new ArrayList<>(); + //JAVA: for (ReplicationSession token : sessions.values()) { + //JAVA: if (token.isExpired(expirationThresholdMilllis)) { + //JAVA: toExpire.add(token); + //JAVA: } + //JAVA: } + //JAVA: for (ReplicationSession token : toExpire) { + //JAVA: releaseSession(token.session.id); + //JAVA: } + //JAVA: } + #endregion + + // .NET NOTE: .ToArray() so we don't modify a collection we are enumerating... + // I am wondering if it would be overall more practical to switch to a concurrent dictionary... + foreach (ReplicationSession token in sessions.Values.Where(token => token.IsExpired(ExpirationThreshold)).ToArray()) + { + ReleaseSession(token.Session.Id); + } + } + + /// <exception cref="InvalidOperationException"></exception> + private void ReleaseSession(string sessionId) + { + #region Java + //JAVA: private void releaseSession(String sessionID) throws IOException { + //JAVA: ReplicationSession session = sessions.remove(sessionID); + //JAVA: // if we're called concurrently by close() and release(), could be that one + //JAVA: // thread beats the other to release the session. + //JAVA: if (session != null) { + //JAVA: session.revision.decRef(); + //JAVA: } + //JAVA: } + #endregion + + ReplicationSession session; + // if we're called concurrently by close() and release(), could be that one + // thread beats the other to release the session. + if (sessions.TryGetValue(sessionId, out session)) + { + sessions.Remove(sessionId); + session.Revision.DecRef(); + } + } + + /// <summary> + /// Ensure that replicator is still open, or throw <see cref="ObjectDisposedException"/> otherwise. + /// </summary> + /// <exception cref="ObjectDisposedException">This replicator has already been closed</exception> + protected void EnsureOpen() + { + lock (padlock) + { + if (!disposed) + return; + + throw new ObjectDisposedException("This replicator has already been disposed"); + } + } + + private class RefCountedRevision + { + private readonly AtomicInt32 refCount = new AtomicInt32(1); + + public IRevision Revision { get; private set; } + + public RefCountedRevision(IRevision revision) + { + Revision = revision; + } + + /// <summary/> + /// <exception cref="InvalidOperationException"></exception> + public void DecRef() + { + if (refCount.Get() <= 0) + { + //JAVA: throw new IllegalStateException("this revision is already released"); + throw new InvalidOperationException("this revision is already released"); + } + + var rc = refCount.DecrementAndGet(); + if (rc == 0) + { + bool success = false; + try + { + Revision.Release(); + success = true; + } + finally + { + if (!success) + { + // Put reference back on failure + refCount.IncrementAndGet(); + } + } + } + else if (rc < 0) + { + //JAVA: throw new IllegalStateException("too many decRef calls: refCount is " + rc + " after decrement"); + throw new InvalidOperationException(string.Format("too many decRef calls: refCount is {0} after decrement", rc)); + } + } + + public void IncRef() + { + refCount.IncrementAndGet(); + } + } + + private class ReplicationSession + { + public SessionToken Session { get; private set; } + public RefCountedRevision Revision { get; private set; } + + private long lastAccessTime; + + public ReplicationSession(SessionToken session, RefCountedRevision revision) + { + Session = session; + Revision = revision; + //JAVA: lastAccessTime = System.currentTimeMillis(); + lastAccessTime = Stopwatch.GetTimestamp(); + } + + public bool IsExpired(long expirationThreshold) + { + //JAVA: return lastAccessTime < (System.currentTimeMillis() - expirationThreshold); + return lastAccessTime < Stopwatch.GetTimestamp() - expirationThreshold * Stopwatch.Frequency / 1000; + } + + public void MarkAccessed() + { + //JAVA: lastAccessTime = System.currentTimeMillis(); + lastAccessTime = Stopwatch.GetTimestamp(); + } + } + + } +} \ No newline at end of file
