This is an automated email from the ASF dual-hosted git repository.

broustant pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new 6d50c592fb0 SOLR-17030: Open TransactionLog for extension. (#2012)
6d50c592fb0 is described below

commit 6d50c592fb0b7e0ea2e52ecf1cde7e882e1d0d0a
Author: Bruno Roustant <[email protected]>
AuthorDate: Thu Oct 19 09:45:16 2023 +0200

    SOLR-17030: Open TransactionLog for extension. (#2012)
---
 .../org/apache/solr/update/TransactionLog.java     | 131 +++++++++++++++------
 .../src/java/org/apache/solr/update/UpdateLog.java |  16 ++-
 2 files changed, 106 insertions(+), 41 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/update/TransactionLog.java 
b/solr/core/src/java/org/apache/solr/update/TransactionLog.java
index 5c28297fa09..3174324f859 100644
--- a/solr/core/src/java/org/apache/solr/update/TransactionLog.java
+++ b/solr/core/src/java/org/apache/solr/update/TransactionLog.java
@@ -18,6 +18,7 @@ package org.apache.solr.update;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.lang.invoke.MethodHandles;
 import java.nio.ByteBuffer;
@@ -63,17 +64,18 @@ import org.slf4j.LoggerFactory;
  */
 public class TransactionLog implements Closeable {
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private boolean debug = log.isDebugEnabled();
-  private boolean trace = log.isTraceEnabled();
+  private static final boolean debug = log.isDebugEnabled();
+  private static final boolean trace = log.isTraceEnabled();
 
   public static final String END_MESSAGE = "SOLR_TLOG_END";
 
   long id;
-  Path tlog;
-  FileChannel channel;
-  OutputStream os;
+  protected Path tlog;
+  protected FileChannel channel;
+  protected OutputStream os;
   // all accesses to this stream should be synchronized on "this" (The 
TransactionLog)
   protected FastOutputStream fos;
+  protected ChannelInputStreamOpener channelInputStreamOpener;
   int numRecords;
   public boolean isBuffer;
 
@@ -104,6 +106,12 @@ public class TransactionLog implements Closeable {
         }
       };
 
+  protected static final OutputStreamOpener OUTPUT_STREAM_OPENER =
+      (channel, position) -> Channels.newOutputStream(channel);
+
+  protected static final ChannelInputStreamOpener CHANNEL_INPUT_STREAM_OPENER =
+      ChannelFastInputStream::new;
+
   public class LogCodec extends JavaBinCodec {
 
     public LogCodec(JavaBinCodec.ObjectResolver resolver) {
@@ -168,9 +176,19 @@ public class TransactionLog implements Closeable {
   }
 
   TransactionLog(Path tlogFile, Collection<String> globalStrings, boolean 
openExisting) {
+    this(tlogFile, globalStrings, openExisting, OUTPUT_STREAM_OPENER, 
CHANNEL_INPUT_STREAM_OPENER);
+  }
+
+  protected TransactionLog(
+      Path tlogFile,
+      Collection<String> globalStrings,
+      boolean openExisting,
+      OutputStreamOpener outputStreamOpener,
+      ChannelInputStreamOpener channelInputStreamOpener) {
     boolean success = false;
     try {
       this.tlog = tlogFile;
+      this.channelInputStreamOpener = channelInputStreamOpener;
 
       if (debug) {
         log.debug(
@@ -190,14 +208,14 @@ public class TransactionLog implements Closeable {
 
         long start = Files.size(tlog);
         channel = FileChannel.open(tlog, StandardOpenOption.READ, 
StandardOpenOption.WRITE);
-        os = Channels.newOutputStream(channel);
-        fos = new FastOutputStream(os, new byte[65536], 0);
-
         if (start > 0) {
           readHeader(null);
+        }
+        os = outputStreamOpener.open(channel, start);
+        fos = new FastOutputStream(os, new byte[65536], 0);
+        if (start > 0) {
           channel.position(start);
-          fos.setWritten(start); // reflect that we aren't starting at the 
beginning
-          assert fos.size() == channel.size();
+          setWrittenCount(start);
         } else {
           addGlobalStrings(globalStrings);
         }
@@ -213,7 +231,7 @@ public class TransactionLog implements Closeable {
                 StandardOpenOption.READ,
                 StandardOpenOption.WRITE,
                 StandardOpenOption.CREATE_NEW);
-        os = Channels.newOutputStream(channel);
+        os = outputStreamOpener.open(channel, 0);
         fos = new FastOutputStream(os, new byte[65536], 0);
 
         addGlobalStrings(globalStrings);
@@ -239,6 +257,20 @@ public class TransactionLog implements Closeable {
   // for subclasses
   protected TransactionLog() {}
 
+  /**
+   * Sets the counter of written data in the {@link FastOutputStream} view of 
the log file, to
+   * reflect that we aren't starting at the beginning.
+   */
+  protected void setWrittenCount(long fileStartOffset) throws IOException {
+    fos.setWritten(fileStartOffset);
+    assert fos.size() == getLogFileSize();
+  }
+
+  /** Gets the log file data size. */
+  protected long getLogFileSize() throws IOException {
+    return channel.size();
+  }
+
   /**
    * Returns the number of records in the log (currently includes the header 
and an optional
    * commit). Note: currently returns 0 for reopened existing log files.
@@ -261,8 +293,11 @@ public class TransactionLog implements Closeable {
     long pos = size - END_MESSAGE.length() - 4;
     if (pos < 0) return false;
     @SuppressWarnings("resource")
-    final ChannelFastInputStream is = new ChannelFastInputStream(channel, pos);
-    is.read(buf);
+    final InputStream is = channelInputStreamOpener.open(channel, pos);
+    int n = is.read(buf);
+    if (n != buf.length) {
+      return false;
+    }
     for (int i = 0; i < buf.length; i++) {
       if (buf[i] != END_MESSAGE.charAt(i)) return false;
     }
@@ -283,14 +318,14 @@ public class TransactionLog implements Closeable {
   }
 
   @SuppressWarnings({"unchecked"})
-  private void readHeader(FastInputStream fis) throws IOException {
+  private void readHeader(DataInputInputStream is) throws IOException {
     // read existing header
-    fis = fis != null ? fis : new ChannelFastInputStream(channel, 0);
+    is = is != null ? is : channelInputStreamOpener.open(channel, 0);
     @SuppressWarnings("resource")
     final LogCodec codec = new LogCodec(resolver);
-    Map<?, ?> header = (Map<?, ?>) codec.unmarshal(fis);
+    Map<?, ?> header = (Map<?, ?>) codec.unmarshal(is);
 
-    fis.readInt(); // skip size
+    is.readInt(); // skip size
 
     // needed to read other records
 
@@ -507,7 +542,7 @@ public class TransactionLog implements Closeable {
         endRecord(pos);
 
         fos.flush(); // flush since this will be the last record in a log fill
-        assert fos.size() == channel.size();
+        assert fos.size() == getLogFileSize();
 
         return pos;
       } catch (IOException e) {
@@ -527,18 +562,18 @@ public class TransactionLog implements Closeable {
       // make sure any unflushed buffer has been flushed
       synchronized (this) {
         // TODO: optimize this by keeping track of what we have flushed up to
-        fos.flushBuffer();
+        fos.flush();
         /*
-        System.out.println("###flushBuffer to " + fos.size() + " 
raf.length()=" + raf.length() + " pos="+pos);
+        System.out.println("###flush to " + fos.size() + " raf.length()=" + 
raf.length() + " pos="+pos);
         if (fos.size() != raf.length() || pos >= fos.size() ) {
-          throw new RuntimeException("ERROR" + "###flushBuffer to " + 
fos.size() + " raf.length()=" + raf.length() + " pos="+pos);
+          throw new RuntimeException("ERROR" + "###flush to " + fos.size() + " 
raf.length()=" + raf.length() + " pos="+pos);
         }
         */
       }
 
-      ChannelFastInputStream fis = new ChannelFastInputStream(channel, pos);
+      DataInputInputStream is = channelInputStreamOpener.open(channel, pos);
       try (LogCodec codec = new LogCodec(resolver)) {
-        return codec.readVal(fis);
+        return codec.readVal(is);
       }
     } catch (IOException e) {
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
@@ -580,7 +615,7 @@ public class TransactionLog implements Closeable {
     if (syncLevel == UpdateLog.SyncLevel.NONE) return;
     try {
       synchronized (this) {
-        fos.flushBuffer();
+        fos.flush();
       }
 
       if (syncLevel == UpdateLog.SyncLevel.FSYNC) {
@@ -657,11 +692,11 @@ public class TransactionLog implements Closeable {
    * Returns a reader that can be used while a log is still in use. Currently 
only *one* LogReader
    * may be outstanding, and that log may only be used from a single thread.
    */
-  public LogReader getReader(long startingPos) {
+  public LogReader getReader(long startingPos) throws IOException {
     return new LogReader(startingPos);
   }
 
-  public LogReader getSortedReader(long startingPos) {
+  public LogReader getSortedReader(long startingPos) throws IOException {
     return new SortedLogReader(startingPos);
   }
 
@@ -674,9 +709,9 @@ public class TransactionLog implements Closeable {
     protected ChannelFastInputStream fis;
     private LogCodec codec = new LogCodec(resolver);
 
-    public LogReader(long startingPos) {
+    public LogReader(long startingPos) throws IOException {
       incref();
-      fis = new ChannelFastInputStream(channel, startingPos);
+      fis = channelInputStreamOpener.open(channel, startingPos);
     }
 
     // for classes that extend
@@ -700,7 +735,7 @@ public class TransactionLog implements Closeable {
           return null;
         }
 
-        fos.flushBuffer();
+        fos.flush();
       }
 
       if (pos == 0) {
@@ -752,7 +787,7 @@ public class TransactionLog implements Closeable {
     // returns best effort current size
     // for info purposes
     public long currentSize() throws IOException {
-      return channel.size();
+      return getLogFileSize();
     }
   }
 
@@ -762,7 +797,7 @@ public class TransactionLog implements Closeable {
     private TreeMap<Long, Long> versionToPos;
     Iterator<Long> iterator;
 
-    public SortedLogReader(long startingPos) {
+    public SortedLogReader(long startingPos) throws IOException {
       super(startingPos);
       this.startingPos = startingPos;
     }
@@ -841,12 +876,12 @@ public class TransactionLog implements Closeable {
 
       long sz;
       synchronized (TransactionLog.this) {
-        fos.flushBuffer();
+        fos.flush();
         sz = fos.size();
-        assert sz == channel.size();
+        assert sz == getLogFileSize();
       }
 
-      fis = new ChannelFastInputStream(channel, 0);
+      fis = channelInputStreamOpener.open(channel, 0);
       if (sz >= 4) {
         // readHeader(fis);  // should not be needed
         prevPos = sz - 4;
@@ -927,8 +962,8 @@ public class TransactionLog implements Closeable {
     }
   }
 
-  static class ChannelFastInputStream extends FastInputStream {
-    private FileChannel ch;
+  public static class ChannelFastInputStream extends FastInputStream {
+    protected FileChannel ch;
 
     public ChannelFastInputStream(FileChannel ch, long chPosition) {
       // super(null, new byte[10],0,0);    // a small buffer size for testing 
purposes
@@ -987,4 +1022,28 @@ public class TransactionLog implements Closeable {
           + position();
     }
   }
+
+  /** Opens {@link OutputStream} from {@link FileChannel}. */
+  protected interface OutputStreamOpener {
+
+    /**
+     * Opens an {@link OutputStream} to write in a {@link FileChannel}.
+     *
+     * @param position The initial write position of the {@link OutputStream} 
view of the {@link
+     *     FileChannel}.
+     */
+    OutputStream open(FileChannel channel, long position) throws IOException;
+  }
+
+  /** Opens {@link ChannelFastInputStream} from {@link FileChannel}. */
+  protected interface ChannelInputStreamOpener {
+
+    /**
+     * Opens a {@link ChannelFastInputStream} to read a {@link FileChannel}.
+     *
+     * @param position The initial read position of the {@link OutputStream} 
view of the {@link
+     *     FileChannel}.
+     */
+    ChannelFastInputStream open(FileChannel channel, long position) throws 
IOException;
+  }
 }
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java 
b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index 1b29405f747..0b008a43d74 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -25,6 +25,7 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.lang.invoke.MethodHandles;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
@@ -1343,9 +1344,10 @@ public class UpdateLog implements PluginInfoInitialized, 
SolrMetricProducer {
     copyOverOldUpdatesMeter.mark();
 
     SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, new 
ModifiableSolrParams());
-    TransactionLog.LogReader logReader = oldTlog.getReader(0);
+    TransactionLog.LogReader logReader = null;
     Object o = null;
     try {
+      logReader = oldTlog.getReader(0);
       while ((o = logReader.next()) != null) {
         try {
           List<?> entry = (List<?>) o;
@@ -1921,10 +1923,14 @@ public class UpdateLog implements 
PluginInfoInitialized, SolrMetricProducer {
             recoveryInfo.positionOfStart,
             inSortedOrder);
         long lastStatusTime = System.nanoTime();
-        if (inSortedOrder) {
-          tlogReader = translog.getSortedReader(recoveryInfo.positionOfStart);
-        } else {
-          tlogReader = translog.getReader(recoveryInfo.positionOfStart);
+        try {
+          if (inSortedOrder) {
+            tlogReader = 
translog.getSortedReader(recoveryInfo.positionOfStart);
+          } else {
+            tlogReader = translog.getReader(recoveryInfo.positionOfStart);
+          }
+        } catch (IOException e) {
+          throw new UncheckedIOException(e);
         }
 
         // NOTE: we don't currently handle a core reload during recovery.  
This would cause the core

Reply via email to