This is an automated email from the ASF dual-hosted git repository.
broustant pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new 6d50c592fb0 SOLR-17030: Open TransactionLog for extension. (#2012)
6d50c592fb0 is described below
commit 6d50c592fb0b7e0ea2e52ecf1cde7e882e1d0d0a
Author: Bruno Roustant <[email protected]>
AuthorDate: Thu Oct 19 09:45:16 2023 +0200
SOLR-17030: Open TransactionLog for extension. (#2012)
---
.../org/apache/solr/update/TransactionLog.java | 131 +++++++++++++++------
.../src/java/org/apache/solr/update/UpdateLog.java | 16 ++-
2 files changed, 106 insertions(+), 41 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/update/TransactionLog.java
b/solr/core/src/java/org/apache/solr/update/TransactionLog.java
index 5c28297fa09..3174324f859 100644
--- a/solr/core/src/java/org/apache/solr/update/TransactionLog.java
+++ b/solr/core/src/java/org/apache/solr/update/TransactionLog.java
@@ -18,6 +18,7 @@ package org.apache.solr.update;
import java.io.Closeable;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
@@ -63,17 +64,18 @@ import org.slf4j.LoggerFactory;
*/
public class TransactionLog implements Closeable {
private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private boolean debug = log.isDebugEnabled();
- private boolean trace = log.isTraceEnabled();
+ private static final boolean debug = log.isDebugEnabled();
+ private static final boolean trace = log.isTraceEnabled();
public static final String END_MESSAGE = "SOLR_TLOG_END";
long id;
- Path tlog;
- FileChannel channel;
- OutputStream os;
+ protected Path tlog;
+ protected FileChannel channel;
+ protected OutputStream os;
// all accesses to this stream should be synchronized on "this" (The
TransactionLog)
protected FastOutputStream fos;
+ protected ChannelInputStreamOpener channelInputStreamOpener;
int numRecords;
public boolean isBuffer;
@@ -104,6 +106,12 @@ public class TransactionLog implements Closeable {
}
};
+ protected static final OutputStreamOpener OUTPUT_STREAM_OPENER =
+ (channel, position) -> Channels.newOutputStream(channel);
+
+ protected static final ChannelInputStreamOpener CHANNEL_INPUT_STREAM_OPENER =
+ ChannelFastInputStream::new;
+
public class LogCodec extends JavaBinCodec {
public LogCodec(JavaBinCodec.ObjectResolver resolver) {
@@ -168,9 +176,19 @@ public class TransactionLog implements Closeable {
}
TransactionLog(Path tlogFile, Collection<String> globalStrings, boolean
openExisting) {
+ this(tlogFile, globalStrings, openExisting, OUTPUT_STREAM_OPENER,
CHANNEL_INPUT_STREAM_OPENER);
+ }
+
+ protected TransactionLog(
+ Path tlogFile,
+ Collection<String> globalStrings,
+ boolean openExisting,
+ OutputStreamOpener outputStreamOpener,
+ ChannelInputStreamOpener channelInputStreamOpener) {
boolean success = false;
try {
this.tlog = tlogFile;
+ this.channelInputStreamOpener = channelInputStreamOpener;
if (debug) {
log.debug(
@@ -190,14 +208,14 @@ public class TransactionLog implements Closeable {
long start = Files.size(tlog);
channel = FileChannel.open(tlog, StandardOpenOption.READ,
StandardOpenOption.WRITE);
- os = Channels.newOutputStream(channel);
- fos = new FastOutputStream(os, new byte[65536], 0);
-
if (start > 0) {
readHeader(null);
+ }
+ os = outputStreamOpener.open(channel, start);
+ fos = new FastOutputStream(os, new byte[65536], 0);
+ if (start > 0) {
channel.position(start);
- fos.setWritten(start); // reflect that we aren't starting at the
beginning
- assert fos.size() == channel.size();
+ setWrittenCount(start);
} else {
addGlobalStrings(globalStrings);
}
@@ -213,7 +231,7 @@ public class TransactionLog implements Closeable {
StandardOpenOption.READ,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE_NEW);
- os = Channels.newOutputStream(channel);
+ os = outputStreamOpener.open(channel, 0);
fos = new FastOutputStream(os, new byte[65536], 0);
addGlobalStrings(globalStrings);
@@ -239,6 +257,20 @@ public class TransactionLog implements Closeable {
// for subclasses
protected TransactionLog() {}
+ /**
+ * Sets the counter of written data in the {@link FastOutputStream} view of
the log file, to
+ * reflect that we aren't starting at the beginning.
+ */
+ protected void setWrittenCount(long fileStartOffset) throws IOException {
+ fos.setWritten(fileStartOffset);
+ assert fos.size() == getLogFileSize();
+ }
+
+ /** Gets the log file data size. */
+ protected long getLogFileSize() throws IOException {
+ return channel.size();
+ }
+
/**
* Returns the number of records in the log (currently includes the header
and an optional
* commit). Note: currently returns 0 for reopened existing log files.
@@ -261,8 +293,11 @@ public class TransactionLog implements Closeable {
long pos = size - END_MESSAGE.length() - 4;
if (pos < 0) return false;
@SuppressWarnings("resource")
- final ChannelFastInputStream is = new ChannelFastInputStream(channel, pos);
- is.read(buf);
+ final InputStream is = channelInputStreamOpener.open(channel, pos);
+ int n = is.read(buf);
+ if (n != buf.length) {
+ return false;
+ }
for (int i = 0; i < buf.length; i++) {
if (buf[i] != END_MESSAGE.charAt(i)) return false;
}
@@ -283,14 +318,14 @@ public class TransactionLog implements Closeable {
}
@SuppressWarnings({"unchecked"})
- private void readHeader(FastInputStream fis) throws IOException {
+ private void readHeader(DataInputInputStream is) throws IOException {
// read existing header
- fis = fis != null ? fis : new ChannelFastInputStream(channel, 0);
+ is = is != null ? is : channelInputStreamOpener.open(channel, 0);
@SuppressWarnings("resource")
final LogCodec codec = new LogCodec(resolver);
- Map<?, ?> header = (Map<?, ?>) codec.unmarshal(fis);
+ Map<?, ?> header = (Map<?, ?>) codec.unmarshal(is);
- fis.readInt(); // skip size
+ is.readInt(); // skip size
// needed to read other records
@@ -507,7 +542,7 @@ public class TransactionLog implements Closeable {
endRecord(pos);
fos.flush(); // flush since this will be the last record in a log fill
- assert fos.size() == channel.size();
+ assert fos.size() == getLogFileSize();
return pos;
} catch (IOException e) {
@@ -527,18 +562,18 @@ public class TransactionLog implements Closeable {
// make sure any unflushed buffer has been flushed
synchronized (this) {
// TODO: optimize this by keeping track of what we have flushed up to
- fos.flushBuffer();
+ fos.flush();
/*
- System.out.println("###flushBuffer to " + fos.size() + "
raf.length()=" + raf.length() + " pos="+pos);
+ System.out.println("###flush to " + fos.size() + " raf.length()=" +
raf.length() + " pos="+pos);
if (fos.size() != raf.length() || pos >= fos.size() ) {
- throw new RuntimeException("ERROR" + "###flushBuffer to " +
fos.size() + " raf.length()=" + raf.length() + " pos="+pos);
+ throw new RuntimeException("ERROR" + "###flush to " + fos.size() + "
raf.length()=" + raf.length() + " pos="+pos);
}
*/
}
- ChannelFastInputStream fis = new ChannelFastInputStream(channel, pos);
+ DataInputInputStream is = channelInputStreamOpener.open(channel, pos);
try (LogCodec codec = new LogCodec(resolver)) {
- return codec.readVal(fis);
+ return codec.readVal(is);
}
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
@@ -580,7 +615,7 @@ public class TransactionLog implements Closeable {
if (syncLevel == UpdateLog.SyncLevel.NONE) return;
try {
synchronized (this) {
- fos.flushBuffer();
+ fos.flush();
}
if (syncLevel == UpdateLog.SyncLevel.FSYNC) {
@@ -657,11 +692,11 @@ public class TransactionLog implements Closeable {
* Returns a reader that can be used while a log is still in use. Currently
only *one* LogReader
* may be outstanding, and that log may only be used from a single thread.
*/
- public LogReader getReader(long startingPos) {
+ public LogReader getReader(long startingPos) throws IOException {
return new LogReader(startingPos);
}
- public LogReader getSortedReader(long startingPos) {
+ public LogReader getSortedReader(long startingPos) throws IOException {
return new SortedLogReader(startingPos);
}
@@ -674,9 +709,9 @@ public class TransactionLog implements Closeable {
protected ChannelFastInputStream fis;
private LogCodec codec = new LogCodec(resolver);
- public LogReader(long startingPos) {
+ public LogReader(long startingPos) throws IOException {
incref();
- fis = new ChannelFastInputStream(channel, startingPos);
+ fis = channelInputStreamOpener.open(channel, startingPos);
}
// for classes that extend
@@ -700,7 +735,7 @@ public class TransactionLog implements Closeable {
return null;
}
- fos.flushBuffer();
+ fos.flush();
}
if (pos == 0) {
@@ -752,7 +787,7 @@ public class TransactionLog implements Closeable {
// returns best effort current size
// for info purposes
public long currentSize() throws IOException {
- return channel.size();
+ return getLogFileSize();
}
}
@@ -762,7 +797,7 @@ public class TransactionLog implements Closeable {
private TreeMap<Long, Long> versionToPos;
Iterator<Long> iterator;
- public SortedLogReader(long startingPos) {
+ public SortedLogReader(long startingPos) throws IOException {
super(startingPos);
this.startingPos = startingPos;
}
@@ -841,12 +876,12 @@ public class TransactionLog implements Closeable {
long sz;
synchronized (TransactionLog.this) {
- fos.flushBuffer();
+ fos.flush();
sz = fos.size();
- assert sz == channel.size();
+ assert sz == getLogFileSize();
}
- fis = new ChannelFastInputStream(channel, 0);
+ fis = channelInputStreamOpener.open(channel, 0);
if (sz >= 4) {
// readHeader(fis); // should not be needed
prevPos = sz - 4;
@@ -927,8 +962,8 @@ public class TransactionLog implements Closeable {
}
}
- static class ChannelFastInputStream extends FastInputStream {
- private FileChannel ch;
+ public static class ChannelFastInputStream extends FastInputStream {
+ protected FileChannel ch;
public ChannelFastInputStream(FileChannel ch, long chPosition) {
// super(null, new byte[10],0,0); // a small buffer size for testing
purposes
@@ -987,4 +1022,28 @@ public class TransactionLog implements Closeable {
+ position();
}
}
+
+ /** Opens {@link OutputStream} from {@link FileChannel}. */
+ protected interface OutputStreamOpener {
+
+ /**
+ * Opens an {@link OutputStream} to write in a {@link FileChannel}.
+ *
+ * @param position The initial write position of the {@link OutputStream}
view of the {@link
+ * FileChannel}.
+ */
+ OutputStream open(FileChannel channel, long position) throws IOException;
+ }
+
+ /** Opens {@link ChannelFastInputStream} from {@link FileChannel}. */
+ protected interface ChannelInputStreamOpener {
+
+ /**
+ * Opens a {@link ChannelFastInputStream} to read a {@link FileChannel}.
+ *
+ * @param position The initial read position of the {@link OutputStream}
view of the {@link
+ * FileChannel}.
+ */
+ ChannelFastInputStream open(FileChannel channel, long position) throws
IOException;
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index 1b29405f747..0b008a43d74 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -25,6 +25,7 @@ import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.lang.invoke.MethodHandles;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
@@ -1343,9 +1344,10 @@ public class UpdateLog implements PluginInfoInitialized,
SolrMetricProducer {
copyOverOldUpdatesMeter.mark();
SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, new
ModifiableSolrParams());
- TransactionLog.LogReader logReader = oldTlog.getReader(0);
+ TransactionLog.LogReader logReader = null;
Object o = null;
try {
+ logReader = oldTlog.getReader(0);
while ((o = logReader.next()) != null) {
try {
List<?> entry = (List<?>) o;
@@ -1921,10 +1923,14 @@ public class UpdateLog implements
PluginInfoInitialized, SolrMetricProducer {
recoveryInfo.positionOfStart,
inSortedOrder);
long lastStatusTime = System.nanoTime();
- if (inSortedOrder) {
- tlogReader = translog.getSortedReader(recoveryInfo.positionOfStart);
- } else {
- tlogReader = translog.getReader(recoveryInfo.positionOfStart);
+ try {
+ if (inSortedOrder) {
+ tlogReader =
translog.getSortedReader(recoveryInfo.positionOfStart);
+ } else {
+ tlogReader = translog.getReader(recoveryInfo.positionOfStart);
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
}
// NOTE: we don't currently handle a core reload during recovery.
This would cause the core