This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new 780875b1ad8 [To rel/1.1][IOTDB-6087] Implement stream interface of
Mods read
780875b1ad8 is described below
commit 780875b1ad8346e7cb5a035163d77f1254efc2bf
Author: Weihao Li <[email protected]>
AuthorDate: Wed Aug 23 09:57:22 2023 +0800
[To rel/1.1][IOTDB-6087] Implement stream interface of Mods read
---
.../db/engine/modification/ModificationFile.java | 54 +--
.../io/LocalTextModificationAccessor.java | 177 ++++++--
.../engine/modification/io/ModificationReader.java | 9 +
.../engine/modification/io/ModificationWriter.java | 7 +-
.../modification/utils/TracedBufferedReader.java | 462 ---------------------
.../iotdb/db/engine/storagegroup/DataRegion.java | 91 ++--
.../iotdb/db/query/context/QueryContext.java | 2 +-
.../engine/modification/ModificationFileTest.java | 1 -
.../io/LocalTextModificationAccessorTest.java | 96 +++--
9 files changed, 274 insertions(+), 625 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
b/server/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
index 54ddbc4acda..6c9c063f9e2 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
@@ -29,14 +29,14 @@ import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.concurrent.GuardedBy;
+
import java.io.File;
import java.io.IOException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Paths;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
import java.util.Random;
/**
@@ -49,10 +49,11 @@ public class ModificationFile implements AutoCloseable {
public static final String FILE_SUFFIX = ".mods";
public static final String COMPACTION_FILE_SUFFIX = ".compaction.mods";
- // lazy loaded, set null when closed
- private List<Modification> modifications;
- private ModificationWriter writer;
- private ModificationReader reader;
+ // whether to verify the last line, it may be incomplete in extreme cases
+ private boolean needVerify = true;
+
+ private final ModificationWriter writer;
+ private final ModificationReader reader;
private String filePath;
private Random random = new Random();
@@ -68,33 +69,11 @@ public class ModificationFile implements AutoCloseable {
this.filePath = filePath;
}
- private void init() {
- synchronized (this) {
- modifications = (List<Modification>) reader.read();
- }
- }
-
- private void checkInit() {
- if (modifications == null) {
- init();
- }
- }
-
/** Release resources such as streams and caches. */
@Override
public void close() throws IOException {
synchronized (this) {
writer.close();
- modifications = null;
- }
- }
-
- public void abort() throws IOException {
- synchronized (this) {
- writer.abort();
- if (modifications != null && !modifications.isEmpty()) {
- modifications.remove(modifications.size() - 1);
- }
}
}
@@ -107,13 +86,19 @@ public class ModificationFile implements AutoCloseable {
*/
public void write(Modification mod) throws IOException {
synchronized (this) {
- writer.write(mod);
- if (modifications != null) {
- modifications.add(mod);
+ if (needVerify && new File(filePath).exists()) {
+ writer.mayTruncateLastLine();
+ needVerify = false;
}
+ writer.write(mod);
}
}
+ @GuardedBy("TsFileResource-WriteLock")
+ public void truncate(long size) {
+ writer.truncate(size);
+ }
+
/**
* Get all modifications stored in this file.
*
@@ -121,11 +106,14 @@ public class ModificationFile implements AutoCloseable {
*/
public Collection<Modification> getModifications() {
synchronized (this) {
- checkInit();
- return new ArrayList<>(modifications);
+ return reader.read();
}
}
+ public Iterable<Modification> getModificationsIter() {
+ return reader::getModificationIterator;
+ }
+
public String getFilePath() {
return filePath;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java
index 88b92931875..ce032c648a8 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java
@@ -21,24 +21,28 @@ package org.apache.iotdb.db.engine.modification.io;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
-import org.apache.iotdb.db.engine.modification.utils.TracedBufferedReader;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
+import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
+import java.util.NoSuchElementException;
/**
* LocalTextModificationAccessor uses a file on local file system to store the
modifications in text
@@ -49,9 +53,10 @@ public class LocalTextModificationAccessor
private static final Logger logger =
LoggerFactory.getLogger(LocalTextModificationAccessor.class);
private static final String SEPARATOR = ",";
- private static final String ABORT_MARK = "aborted";
+ private static final String NO_MODIFICATION_MSG =
+ "No modification has been written to this file[{}]";
- private String filePath;
+ private final String filePath;
private BufferedWriter writer;
/**
@@ -65,44 +70,82 @@ public class LocalTextModificationAccessor
@Override
public Collection<Modification> read() {
+ List<Modification> result = new ArrayList<>();
+ Iterator<Modification> iterator = getModificationIterator();
+ while (iterator.hasNext()) {
+ result.add(iterator.next());
+ }
+ return result;
+ }
+
+ // we need to hold the reader for the Iterator, cannot use auto close or
close in finally block
+ @SuppressWarnings("java:S2095")
+ @Override
+ public Iterator<Modification> getModificationIterator() {
File file = FSFactoryProducer.getFSFactory().getFile(filePath);
- if (!file.exists()) {
- logger.debug("No modification has been written to this file");
- return new ArrayList<>();
- }
-
- String line;
- long truncatedSize = 0;
- boolean crashed = false;
- List<Modification> modificationList = new ArrayList<>();
- try (TracedBufferedReader reader = new TracedBufferedReader(new
FileReader(file))) {
- while ((line = reader.readLine()) != null) {
- if (line.equals(ABORT_MARK) && !modificationList.isEmpty()) {
- modificationList.remove(modificationList.size() - 1);
- } else {
- modificationList.add(decodeModification(line));
+ final BufferedReader reader;
+ try {
+ reader = new BufferedReader(new FileReader(file));
+ } catch (FileNotFoundException e) {
+ logger.debug(NO_MODIFICATION_MSG, file);
+
+ // return empty iterator
+ return new Iterator<Modification>() {
+ @Override
+ public boolean hasNext() {
+ return false;
}
- truncatedSize = reader.position();
+
+ @Override
+ public Modification next() {
+ throw new NoSuchElementException();
+ }
+ };
+ }
+
+ final Modification[] cachedModification = new Modification[1];
+ return new Iterator<Modification>() {
+ @Override
+ public boolean hasNext() {
+ try {
+ if (cachedModification[0] == null) {
+ String line = reader.readLine();
+ if (line == null) {
+ reader.close();
+ return false;
+ } else {
+ return decodeModificationAndCache(reader, cachedModification,
line);
+ }
+ }
+ } catch (IOException e) {
+ logger.warn("An error occurred when reading modifications", e);
+ }
+ return true;
}
- } catch (IOException e) {
- crashed = true;
- logger.error(
- "An error occurred when reading modifications, and the remaining
modifications will be truncated to size {}.",
- truncatedSize,
- e);
- }
-
- if (crashed) {
- try (FileOutputStream outputStream = new FileOutputStream(file, true)) {
- outputStream.getChannel().truncate(truncatedSize);
- } catch (FileNotFoundException e) {
- logger.debug("No modification has been written to this file");
- } catch (IOException e) {
- logger.error(
- "An error occurred when truncating modifications to size {}.",
truncatedSize, e);
+
+ @Override
+ public Modification next() {
+ if (cachedModification[0] == null) {
+ throw new NoSuchElementException();
+ }
+ Modification result = cachedModification[0];
+ cachedModification[0] = null;
+ return result;
}
+ };
+ }
+
+ private boolean decodeModificationAndCache(
+ BufferedReader reader, Modification[] cachedModification, String line)
throws IOException {
+ try {
+ cachedModification[0] = decodeModification(line);
+ return true;
+ } catch (IOException e) {
+ logger.warn("An error occurred when decode line-[{}] to modification",
line);
+ cachedModification[0] = null;
+ reader.close();
+ return false;
}
- return modificationList;
}
@Override
@@ -114,23 +157,71 @@ public class LocalTextModificationAccessor
}
@Override
- public void abort() throws IOException {
+ public void write(Modification mod) throws IOException {
if (writer == null) {
writer = FSFactoryProducer.getFSFactory().getBufferedWriter(filePath,
true);
}
- writer.write(ABORT_MARK);
+ writer.write(encodeModification(mod));
writer.newLine();
writer.flush();
}
- @Override
- public void write(Modification mod) throws IOException {
+ @TestOnly
+ public void writeInComplete(Modification mod) throws IOException {
if (writer == null) {
writer = FSFactoryProducer.getFSFactory().getBufferedWriter(filePath,
true);
}
- writer.write(encodeModification(mod));
- writer.newLine();
- writer.flush();
+ String line = encodeModification(mod);
+ if (line != null) {
+ writer.write(line.substring(0, 2));
+ }
+ }
+
+ @TestOnly
+ public void writeMeetException(Modification mod) throws IOException {
+ if (writer == null) {
+ writer = FSFactoryProducer.getFSFactory().getBufferedWriter(filePath,
true);
+ }
+ writeInComplete(mod);
+ throw new IOException();
+ }
+
+ @Override
+ public void truncate(long size) {
+ try (FileOutputStream outputStream =
+ new
FileOutputStream(FSFactoryProducer.getFSFactory().getFile(filePath), true)) {
+ outputStream.getChannel().truncate(size);
+ logger.warn("The modifications[{}] will be truncated to size {}.",
filePath, size);
+ } catch (FileNotFoundException e) {
+ logger.debug(NO_MODIFICATION_MSG, filePath);
+ } catch (IOException e) {
+ logger.error(
+ "An error occurred when truncating modifications[{}] to size {}.",
filePath, size, e);
+ }
+ }
+
+ @Override
+ public void mayTruncateLastLine() {
+ try (RandomAccessFile file = new RandomAccessFile(filePath, "r")) {
+ long filePointer = file.length() - 1;
+ if (filePointer == 0) {
+ return;
+ }
+
+ file.seek(filePointer);
+ byte lastChar = file.readByte();
+ if (lastChar != '\n') {
+ while (filePointer > -1 && lastChar != '\n') {
+ file.seek(filePointer);
+ filePointer--;
+ lastChar = file.readByte();
+ }
+ logger.warn("The last line of Mods is incomplete, will be truncated");
+ truncate(filePointer + 2);
+ }
+ } catch (IOException e) {
+ logger.error("An error occurred when reading modifications", e);
+ }
}
private static String encodeModification(Modification mod) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationReader.java
b/server/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationReader.java
index df230373d00..99928b4c18c 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationReader.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationReader.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.engine.modification.Modification;
import java.io.IOException;
import java.util.Collection;
+import java.util.Iterator;
/** ModificationReader reads all modifications from a persistent medium like
file system. */
public interface ModificationReader {
@@ -35,6 +36,14 @@ public interface ModificationReader {
*/
Collection<Modification> read();
+ /**
+ * Get an iterator over this mod file, others keep consistence with {@link
#read()}. Please ensure
+ * you have called hasNext() with return of {@code true} before calling
next().
+ *
+ * @return the modification iterator.
+ */
+ Iterator<Modification> getModificationIterator();
+
/** Release resources like streams. */
void close() throws IOException;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java
b/server/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java
index 64a500541e3..a097a6ba72b 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java
@@ -37,9 +37,10 @@ public interface ModificationWriter {
*/
void write(Modification mod) throws IOException;
+ void truncate(long size);
+
+ void mayTruncateLastLine();
+
/** Release resources like streams. */
void close() throws IOException;
-
- /** Abort last modification. Notice that after calling abort(), a fileWriter
is opened. */
- void abort() throws IOException;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/modification/utils/TracedBufferedReader.java
b/server/src/main/java/org/apache/iotdb/db/engine/modification/utils/TracedBufferedReader.java
deleted file mode 100644
index 3b92aaf13a8..00000000000
---
a/server/src/main/java/org/apache/iotdb/db/engine/modification/utils/TracedBufferedReader.java
+++ /dev/null
@@ -1,462 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.engine.modification.utils;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.Reader;
-import java.io.UncheckedIOException;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.Spliterator;
-import java.util.Spliterators;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
-
-/**
- * Copied from {@link java.io.BufferedReader}, trace the read position by
modifying the fill()
- * method.
- */
-public class TracedBufferedReader extends Reader {
- private Reader in;
-
- private char cb[];
- private int nChars, nextChar;
-
- private static final int INVALIDATED = -2;
- private static final int UNMARKED = -1;
- private int markedChar = UNMARKED;
- private int readAheadLimit = 0; /* Valid only when markedChar > 0 */
-
- /** If the next character is a line feed, skip it */
- private boolean skipLF = false;
-
- /** The skipLF flag when the mark was set */
- private boolean markedSkipLF = false;
-
- private static int defaultCharBufferSize = 8192;
- private static int defaultExpectedLineLength = 80;
-
- /** the total bytes number already filled into cb */
- private long totalFilledBytesNum = 0;
-
- /**
- * Creates a buffering character-input stream that uses an input buffer of
the specified size.
- *
- * @param in A Reader
- * @param sz Input-buffer size
- * @exception IllegalArgumentException If {@code sz <= 0}
- */
- public TracedBufferedReader(Reader in, int sz) {
- super(in);
- if (sz <= 0) {
- throw new IllegalArgumentException("Buffer size <= 0");
- }
- this.in = in;
- cb = new char[sz];
- nextChar = nChars = 0;
- }
-
- /**
- * Creates a buffering character-input stream that uses a default-sized
input buffer.
- *
- * @param in A Reader
- */
- public TracedBufferedReader(Reader in) {
- this(in, defaultCharBufferSize);
- }
-
- /** Checks to make sure that the stream has not been closed */
- private void ensureOpen() throws IOException {
- if (in == null) {
- throw new IOException("Stream closed");
- }
- }
-
- /** {@link BufferedReader#fill()} */
- private void fill() throws IOException {
- int dst;
- if (markedChar <= UNMARKED) {
- /* No mark */
- dst = 0;
- } else {
- /* Marked */
- int delta = nextChar - markedChar;
- if (delta >= readAheadLimit) {
- /* Gone past read-ahead limit: Invalidate mark */
- markedChar = INVALIDATED;
- readAheadLimit = 0;
- dst = 0;
- } else {
- if (readAheadLimit <= cb.length) {
- /* Shuffle in the current buffer */
- System.arraycopy(cb, markedChar, cb, 0, delta);
- markedChar = 0;
- dst = delta;
- } else {
- /* Reallocate buffer to accommodate read-ahead limit */
- char ncb[] = new char[readAheadLimit];
- System.arraycopy(cb, markedChar, ncb, 0, delta);
- cb = ncb;
- markedChar = 0;
- dst = delta;
- }
- nextChar = nChars = delta;
- }
- }
-
- int n;
- do {
- n = in.read(cb, dst, cb.length - dst);
- } while (n == 0);
- if (n > 0) {
- nChars = dst + n;
- nextChar = dst;
- totalFilledBytesNum = totalFilledBytesNum + n;
- }
- }
-
- /** {@link BufferedReader#read()} */
- @Override
- public int read() throws IOException {
- synchronized (lock) {
- ensureOpen();
- for (; ; ) {
- if (nextChar >= nChars) {
- fill();
- if (nextChar >= nChars) {
- return -1;
- }
- }
- if (skipLF) {
- skipLF = false;
- if (cb[nextChar] == '\n') {
- nextChar++;
- continue;
- }
- }
- return cb[nextChar++];
- }
- }
- }
-
- /** {@link BufferedReader#read1(char[], int, int)} */
- private int read1(char[] cbuf, int off, int len) throws IOException {
- if (nextChar >= nChars) {
- /* If the requested length is at least as large as the buffer, and
- if there is no mark/reset activity, and if line feeds are not
- being skipped, do not bother to copy the characters into the
- local buffer. In this way buffered streams will cascade
- harmlessly. */
- if (len >= cb.length && markedChar <= UNMARKED && !skipLF) {
- return in.read(cbuf, off, len);
- }
- fill();
- }
- if (nextChar >= nChars) {
- return -1;
- }
- if (skipLF) {
- skipLF = false;
- if (cb[nextChar] == '\n') {
- nextChar++;
- if (nextChar >= nChars) {
- fill();
- }
- if (nextChar >= nChars) {
- return -1;
- }
- }
- }
- int n = Math.min(len, nChars - nextChar);
- System.arraycopy(cb, nextChar, cbuf, off, n);
- nextChar += n;
- return n;
- }
-
- /** {@link BufferedReader#read(char[], int, int)} */
- @Override
- public int read(char cbuf[], int off, int len) throws IOException {
- synchronized (lock) {
- ensureOpen();
- if ((off < 0)
- || (off > cbuf.length)
- || (len < 0)
- || ((off + len) > cbuf.length)
- || ((off + len) < 0)) {
- throw new IndexOutOfBoundsException();
- } else if (len == 0) {
- return 0;
- }
-
- int n = read1(cbuf, off, len);
- if (n <= 0) {
- return n;
- }
- while ((n < len) && in.ready()) {
- int n1 = read1(cbuf, off + n, len - n);
- if (n1 <= 0) {
- break;
- }
- n += n1;
- }
- return n;
- }
- }
-
- /** {@link BufferedReader#readLine(boolean)} */
- String readLine(boolean ignoreLF) throws IOException {
- StringBuilder s = null;
- int startChar;
-
- synchronized (lock) {
- ensureOpen();
- boolean omitLF = ignoreLF || skipLF;
-
- bufferLoop:
- for (; ; ) {
-
- if (nextChar >= nChars) {
- fill();
- }
- if (nextChar >= nChars) {
- /* EOF */
- if (s != null && s.length() > 0) {
- return s.toString();
- } else {
- return null;
- }
- }
- boolean eol = false;
- char c = 0;
- int i;
-
- /* Skip a leftover '\n', if necessary */
- if (omitLF && (cb[nextChar] == '\n')) {
- nextChar++;
- }
- skipLF = false;
- omitLF = false;
-
- charLoop:
- for (i = nextChar; i < nChars; i++) {
- c = cb[i];
- if ((c == '\n') || (c == '\r')) {
- eol = true;
- break charLoop;
- }
- }
-
- startChar = nextChar;
- nextChar = i;
-
- if (eol) {
- String str;
- if (s == null) {
- str = new String(cb, startChar, i - startChar);
- } else {
- s.append(cb, startChar, i - startChar);
- str = s.toString();
- }
- nextChar++;
- if (c == '\r') {
- skipLF = true;
- if (read() != -1) {
- nextChar--;
- }
- }
- return str;
- }
-
- if (s == null) {
- s = new StringBuilder(defaultExpectedLineLength);
- }
- s.append(cb, startChar, i - startChar);
- }
- }
- }
-
- /** {@link BufferedReader#readLine()} */
- public String readLine() throws IOException {
- return readLine(false);
- }
-
- /** {@link BufferedReader#skip(long)} */
- @Override
- public long skip(long n) throws IOException {
- if (n < 0L) {
- throw new IllegalArgumentException("skip value is negative");
- }
- synchronized (lock) {
- ensureOpen();
- long r = n;
- while (r > 0) {
- if (nextChar >= nChars) {
- fill();
- }
- if (nextChar >= nChars) {
- /* EOF */
- break;
- }
- if (skipLF) {
- skipLF = false;
- if (cb[nextChar] == '\n') {
- nextChar++;
- }
- }
- long d = (long) nChars - nextChar;
- if (r <= d) {
- nextChar += r;
- r = 0;
- break;
- } else {
- r -= d;
- nextChar = nChars;
- }
- }
- return n - r;
- }
- }
-
- /** {@link BufferedReader#ready()} */
- @Override
- public boolean ready() throws IOException {
- synchronized (lock) {
- ensureOpen();
-
- /*
- * If newline needs to be skipped and the next char to be read
- * is a newline character, then just skip it right away.
- */
- if (skipLF) {
- /* Note that in.ready() will return true if and only if the next
- * read on the stream will not block.
- */
- if (nextChar >= nChars && in.ready()) {
- fill();
- }
- if (nextChar < nChars) {
- if (cb[nextChar] == '\n') {
- nextChar++;
- }
- skipLF = false;
- }
- }
- return (nextChar < nChars) || in.ready();
- }
- }
-
- /** {@link BufferedReader#markSupported()} */
- @Override
- public boolean markSupported() {
- return true;
- }
-
- /** {@link BufferedReader#mark(int)} */
- @Override
- public void mark(int readAheadLimit) throws IOException {
- if (readAheadLimit < 0) {
- throw new IllegalArgumentException("Read-ahead limit < 0");
- }
- synchronized (lock) {
- ensureOpen();
- this.readAheadLimit = readAheadLimit;
- markedChar = nextChar;
- markedSkipLF = skipLF;
- }
- }
-
- /** {@link BufferedReader#reset()} */
- @Override
- public void reset() throws IOException {
- synchronized (lock) {
- ensureOpen();
- if (markedChar < 0) {
- throw new IOException((markedChar == INVALIDATED) ? "Mark invalid" :
"Stream not marked");
- }
- nextChar = markedChar;
- skipLF = markedSkipLF;
- }
- }
-
- /** {@link BufferedReader#close()} */
- @Override
- public void close() throws IOException {
- synchronized (lock) {
- if (in == null) {
- return;
- }
- try {
- in.close();
- } finally {
- in = null;
- cb = null;
- }
- }
- }
-
- /** {@link BufferedReader#lines()} */
- public Stream<String> lines() {
- Iterator<String> iter =
- new Iterator<String>() {
- String nextLine = null;
-
- @Override
- public boolean hasNext() {
- if (nextLine != null) {
- return true;
- } else {
- try {
- nextLine = readLine();
- return (nextLine != null);
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
- }
-
- @Override
- public String next() {
- if (nextLine != null || hasNext()) {
- String line = nextLine;
- nextLine = null;
- return line;
- } else {
- throw new NoSuchElementException();
- }
- }
- };
- return StreamSupport.stream(
- Spliterators.spliteratorUnknownSize(iter, Spliterator.ORDERED |
Spliterator.NONNULL),
- false);
- }
-
- /**
- * Returns this reader's file position.
- *
- * @return This reader's file position, a non-negative integer counting the
number of bytes from
- * the beginning of the file to the current position
- */
- public long position() {
- // position = totalFilledBytesNum - lastFilledBytesNum +
readOffsetInLastFilledBytes
- // lastFilledBytesNum = nChars - dst, readOffsetInLastFilledBytes =
nextChar - dst
- return totalFilledBytesNum - nChars + nextChar;
- }
-}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 35aa650831a..f9fe55e2d62 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -1888,8 +1888,6 @@ public class DataRegion implements IDataRegionForQuery {
// mod files in mergingModification, sequenceFileList, and
unsequenceFileList
writeLock("delete");
- // record files which are updated so that we can roll back them in case of
exception
- List<ModificationFile> updatedModFiles = new ArrayList<>();
boolean hasReleasedLock = false;
try {
@@ -1917,21 +1915,13 @@ public class DataRegion implements IDataRegionForQuery {
List<TsFileResource> unsealedTsFileResource = new ArrayList<>();
separateTsFile(sealedTsFileResource, unsealedTsFileResource);
- deleteDataInFiles(
- unsealedTsFileResource, deletion, devicePaths, updatedModFiles,
timePartitionFilter);
+ deleteDataInFiles(unsealedTsFileResource, deletion, devicePaths,
timePartitionFilter);
writeUnlock();
hasReleasedLock = true;
- deleteDataInFiles(
- sealedTsFileResource, deletion, devicePaths, updatedModFiles,
timePartitionFilter);
+ deleteDataInFiles(sealedTsFileResource, deletion, devicePaths,
timePartitionFilter);
} catch (Exception e) {
- // roll back
- for (ModificationFile modFile : updatedModFiles) {
- modFile.abort();
- // remember to close mod file
- modFile.close();
- }
throw new IOException(e);
} finally {
if (!hasReleasedLock) {
@@ -2022,11 +2012,12 @@ public class DataRegion implements IDataRegionForQuery {
return true;
}
+ // suppress warn of Throwable catch
+ @SuppressWarnings("java:S1181")
private void deleteDataInFiles(
Collection<TsFileResource> tsFileResourceList,
Deletion deletion,
Set<PartialPath> devicePaths,
- List<ModificationFile> updatedModFiles,
TimePartitionFilter timePartitionFilter)
throws IOException {
for (TsFileResource tsFileResource : tsFileResourceList) {
@@ -2039,39 +2030,50 @@ public class DataRegion implements IDataRegionForQuery {
continue;
}
+ ModificationFile modFile = tsFileResource.getModFile();
if (tsFileResource.isClosed()) {
- // delete data in sealed file
- if (tsFileResource.isCompacting()) {
- // we have to set modification offset to MAX_VALUE, as the offset of
source chunk may
- // change after compaction
- deletion.setFileOffset(Long.MAX_VALUE);
- // write deletion into compaction modification file
- tsFileResource.getCompactionModFile().write(deletion);
- // write deletion into modification file to enable query during
compaction
- tsFileResource.getModFile().write(deletion);
- // remember to close mod file
- tsFileResource.getCompactionModFile().close();
- tsFileResource.getModFile().close();
- } else {
- deletion.setFileOffset(tsFileResource.getTsFileSize());
- // write deletion into modification file
- boolean modFileExists = tsFileResource.getModFile().exists();
- long originSize = tsFileResource.getModFile().getSize();
- tsFileResource.getModFile().write(deletion);
- // remember to close mod file
- tsFileResource.getModFile().close();
- if (!modFileExists) {
- TsFileMetricManager.getInstance().increaseModFileNum(1);
+ long originSize = -1;
+ synchronized (modFile) {
+ try {
+ originSize = modFile.getSize();
+ // delete data in sealed file
+ if (tsFileResource.isCompacting()) {
+ // we have to set modification offset to MAX_VALUE, as the
offset of source chunk may
+ // change after compaction
+ deletion.setFileOffset(Long.MAX_VALUE);
+ // write deletion into compaction modification file
+ tsFileResource.getCompactionModFile().write(deletion);
+ // write deletion into modification file to enable read during
compaction
+ modFile.write(deletion);
+ // remember to close mod file
+ tsFileResource.getCompactionModFile().close();
+ modFile.close();
+ } else {
+ deletion.setFileOffset(tsFileResource.getTsFileSize());
+ // write deletion into modification file
+ boolean modFileExists = tsFileResource.getModFile().exists();
+ tsFileResource.getModFile().write(deletion);
+ // remember to close mod file
+ tsFileResource.getModFile().close();
+ if (!modFileExists) {
+ TsFileMetricManager.getInstance().increaseModFileNum(1);
+ }
+ TsFileMetricManager.getInstance()
+ .increaseModFileSize(tsFileResource.getModFile().getSize() -
originSize);
+ }
+ } catch (Throwable t) {
+ if (originSize != -1) {
+ modFile.truncate(originSize);
+ }
+ throw t;
}
- TsFileMetricManager.getInstance()
- .increaseModFileSize(tsFileResource.getModFile().getSize() -
originSize);
+ logger.info(
+ "[Deletion] Deletion with path:{}, time:{}-{} written into mods
file:{}.",
+ deletion.getPath(),
+ deletion.getStartTime(),
+ deletion.getEndTime(),
+ modFile.getFilePath());
}
- logger.info(
- "[Deletion] Deletion with path:{}, time:{}-{} written into mods
file:{}.",
- deletion.getPath(),
- deletion.getStartTime(),
- deletion.getEndTime(),
- tsFileResource.getModFile().getFilePath());
} else {
// delete data in memory of unsealed file
tsFileResource.getProcessor().deleteDataInMemory(deletion,
devicePaths);
@@ -2081,9 +2083,6 @@ public class DataRegion implements IDataRegionForQuery {
SyncService.getInstance().getOrCreateSyncManager(dataRegionId)) {
syncManager.syncRealTimeDeletion(deletion);
}
-
- // add a record in case of rollback
- updatedModFiles.add(tsFileResource.getModFile());
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java
b/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java
index 2a44762fddd..7fabfc378e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java
@@ -108,7 +108,7 @@ public class QueryContext {
fileModCache.get(modFile.getFilePath());
if (allModifications == null) {
allModifications = PatternTreeMapFactory.getModsPatternTreeMap();
- for (Modification modification : modFile.getModifications()) {
+ for (Modification modification : modFile.getModificationsIter()) {
allModifications.append(modification.getPath(), modification);
}
fileModCache.put(modFile.getFilePath(), allModifications);
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java
index cacd23f2246..fa17cbd4e30 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java
@@ -89,7 +89,6 @@ public class ModificationFileTest {
mFile.write(modifications[i]);
}
modificationList = (List<Modification>) mFile.getModifications();
- mFile.abort();
for (int i = 0; i < 3; i++) {
assertEquals(modifications[i], modificationList.get(i));
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessorTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessorTest.java
index 4a907b406c3..e0369928cce 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessorTest.java
@@ -24,11 +24,10 @@ import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
+import org.junit.AfterClass;
import org.junit.Test;
-import java.io.BufferedWriter;
import java.io.File;
-import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -38,17 +37,53 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class LocalTextModificationAccessorTest {
+ private static Modification[] modifications =
+ new Modification[] {
+ new Deletion(new PartialPath(new String[] {"d1", "s1"}), 1, 1),
+ new Deletion(new PartialPath(new String[] {"d1", "s2"}), 2, 2),
+ new Deletion(new PartialPath(new String[] {"d1", "s3"}), 3, 3),
+ new Deletion(new PartialPath(new String[] {"d1", "s4"}), 4, 4),
+ };
+
+ @AfterClass
+ public static void tearDown() {
+ modifications = null;
+ }
+
+ @Test
+ public void writeMeetException() throws IOException {
+ String tempFileName = TestConstant.BASE_OUTPUT_PATH.concat("mod.temp");
+ long length = 0;
+ LocalTextModificationAccessor accessor = null;
+ try {
+ accessor = new LocalTextModificationAccessor(tempFileName);
+ for (int i = 0; i < 2; i++) {
+ accessor.write(modifications[i]);
+ }
+ length = new File(tempFileName).length();
+ // the current line should be truncated when meet exception
+ accessor.writeMeetException(modifications[2]);
+ for (int i = 2; i < 4; i++) {
+ accessor.write(modifications[i]);
+ }
+ List<Modification> modificationList = (List<Modification>)
accessor.read();
+ assertEquals(4, modificationList.size());
+ for (int i = 0; i < 4; i++) {
+ assertEquals(modifications[i], modificationList.get(i));
+ }
+ } catch (IOException e) {
+ accessor.truncate(length);
+ } finally {
+ if (accessor != null) {
+ accessor.close();
+ }
+ new File(tempFileName).delete();
+ }
+ }
@Test
public void readMyWrite() {
String tempFileName = TestConstant.BASE_OUTPUT_PATH.concat("mod.temp");
- Modification[] modifications =
- new Modification[] {
- new Deletion(new PartialPath(new String[] {"d1", "s1"}), 1, 1),
- new Deletion(new PartialPath(new String[] {"d1", "s2"}), 2, 2),
- new Deletion(new PartialPath(new String[] {"d1", "s3"}), 3, 3),
- new Deletion(new PartialPath(new String[] {"d1", "s4"}), 4, 4),
- };
try (LocalTextModificationAccessor accessor = new
LocalTextModificationAccessor(tempFileName)) {
for (int i = 0; i < 2; i++) {
accessor.write(modifications[i]);
@@ -75,52 +110,41 @@ public class LocalTextModificationAccessorTest {
@Test
public void readNull() {
String tempFileName = TestConstant.BASE_OUTPUT_PATH.concat("mod.temp");
- LocalTextModificationAccessor accessor = new
LocalTextModificationAccessor(tempFileName);
- new File(tempFileName).delete();
- Collection<Modification> modifications = accessor.read();
- assertEquals(new ArrayList<>(), modifications);
+ try (LocalTextModificationAccessor accessor = new
LocalTextModificationAccessor(tempFileName)) {
+ new File(tempFileName).delete();
+ Collection<Modification> modifications = accessor.read();
+ assertEquals(new ArrayList<>(), modifications);
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
}
@Test
- public void readAndTruncate() {
+ public void readMeetError() {
String tempFileName = TestConstant.BASE_OUTPUT_PATH.concat("mod.temp");
File file = new File(tempFileName);
if (file.exists()) {
file.delete();
}
- Modification[] modifications =
- new Modification[] {
- new Deletion(new PartialPath(new String[] {"d1", "s1"}), 1, 1),
- new Deletion(new PartialPath(new String[] {"d1", "s2"}), 2, 2),
- new Deletion(new PartialPath(new String[] {"d1", "s3"}), 3, 3),
- new Deletion(new PartialPath(new String[] {"d1", "s4"}), 4, 4),
- };
- try (LocalTextModificationAccessor accessor = new
LocalTextModificationAccessor(tempFileName);
- BufferedWriter writer = new BufferedWriter(new
FileWriter(tempFileName, true))) {
+ try (LocalTextModificationAccessor accessor = new
LocalTextModificationAccessor(tempFileName)) {
// write normal message
- for (int i = 0; i < 2; i++) {
+ for (int i = 0; i < 4; i++) {
accessor.write(modifications[i]);
}
List<Modification> modificationList = (List<Modification>)
accessor.read();
- for (int i = 0; i < 2; i++) {
+ for (int i = 0; i < 4; i++) {
assertEquals(modifications[i], modificationList.get(i));
}
// write error message
- long length = file.length();
- writer.write("error");
- writer.newLine();
- writer.flush();
- // write normal message & read
- for (int i = 2; i < 4; i++) {
- accessor.write(modifications[i]);
- }
+ accessor.writeInComplete(modifications[0]);
+
modificationList = (List<Modification>) accessor.read();
- for (int i = 0; i < 2; i++) {
+ // the error line is ignored
+ assertEquals(4, modificationList.size());
+ for (int i = 0; i < 4; i++) {
System.out.println(modificationList);
assertEquals(modifications[i], modificationList.get(i));
}
- // check truncated file
- assertEquals(length, file.length());
} catch (IOException e) {
fail(e.getMessage());
} finally {