This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4875c24c566 [IOTDB-6087] Implement stream interface of Mods read
4875c24c566 is described below
commit 4875c24c5661a06a84e38cf4e54a8a43b996789a
Author: Weihao Li <[email protected]>
AuthorDate: Fri Jul 28 10:34:15 2023 +0800
[IOTDB-6087] Implement stream interface of Mods read
---
.../execution/fragment/QueryContext.java | 2 +-
.../db/storageengine/dataregion/DataRegion.java | 111 +++--
.../dataregion/modification/ModificationFile.java | 59 +--
.../io/LocalTextModificationAccessor.java | 177 ++++++--
.../modification/io/ModificationReader.java | 9 +
.../modification/io/ModificationWriter.java | 7 +-
.../modification/utils/TracedBufferedReader.java | 463 ---------------------
.../modification/ModificationFileTest.java | 25 +-
.../io/LocalTextModificationAccessorTest.java | 96 +++--
9 files changed, 289 insertions(+), 660 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
index d927d90c529..60ee9baeaac 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
@@ -95,7 +95,7 @@ public class QueryContext {
fileModCache.get(modFile.getFilePath());
if (allModifications == null) {
allModifications = PatternTreeMapFactory.getModsPatternTreeMap();
- for (Modification modification : modFile.getModifications()) {
+ for (Modification modification : modFile.getModificationsIter()) {
allModifications.append(modification.getPath(), modification);
}
fileModCache.put(modFile.getFilePath(), allModifications);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 102392fae98..7ba71add820 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1818,8 +1818,6 @@ public class DataRegion implements IDataRegionForQuery {
// mod files in mergingModification, sequenceFileList, and
unsequenceFileList
writeLock("delete");
- // record files which are updated so that we can roll back them in case of
exception
- List<ModificationFile> updatedModFiles = new ArrayList<>();
boolean hasReleasedLock = false;
try {
@@ -1855,30 +1853,14 @@ public class DataRegion implements IDataRegionForQuery {
// deviceMatchInfo contains the DeviceId means this device matched the
pattern
Set<String> deviceMatchInfo = new HashSet<>();
deleteDataInFiles(
- unsealedTsFileResource,
- deletion,
- devicePaths,
- updatedModFiles,
- timePartitionFilter,
- deviceMatchInfo);
+ unsealedTsFileResource, deletion, devicePaths, timePartitionFilter,
deviceMatchInfo);
writeUnlock();
hasReleasedLock = true;
deleteDataInFiles(
- sealedTsFileResource,
- deletion,
- devicePaths,
- updatedModFiles,
- timePartitionFilter,
- deviceMatchInfo);
+ sealedTsFileResource, deletion, devicePaths, timePartitionFilter,
deviceMatchInfo);
} catch (Exception e) {
- // roll back
- for (ModificationFile modFile : updatedModFiles) {
- modFile.abort();
- // remember to close mod file
- modFile.close();
- }
throw new IOException(e);
} finally {
if (!hasReleasedLock) {
@@ -1985,11 +1967,12 @@ public class DataRegion implements IDataRegionForQuery {
return true;
}
+ // suppress warn of Throwable catch
+ @SuppressWarnings("java:S1181")
private void deleteDataInFiles(
Collection<TsFileResource> tsFileResourceList,
Deletion deletion,
Set<PartialPath> devicePaths,
- List<ModificationFile> updatedModFiles,
TimePartitionFilter timePartitionFilter,
Set<String> deviceMatchInfo)
throws IOException {
@@ -2006,52 +1989,60 @@ public class DataRegion implements IDataRegionForQuery {
ModificationFile modFile = tsFileResource.getModFile();
if (tsFileResource.isClosed()) {
- // delete data in sealed file
- if (tsFileResource.isCompacting()) {
- // we have to set modification offset to MAX_VALUE, as the offset of
source chunk may
- // change after compaction
- deletion.setFileOffset(Long.MAX_VALUE);
- // write deletion into compaction modification file
- tsFileResource.getCompactionModFile().write(deletion);
- // write deletion into modification file to enable read during
compaction
- modFile.write(deletion);
- // remember to close mod file
- tsFileResource.getCompactionModFile().close();
- modFile.close();
- } else {
- deletion.setFileOffset(tsFileResource.getTsFileSize());
- // write deletion into modification file
- boolean modFileExists = modFile.exists();
- long originSize = modFile.getSize();
- modFile.write(deletion);
-
- // remember to close mod file
- modFile.close();
-
- // if file length greater than 1M,execute compact.
- modFile.compact();
+ long originSize = -1;
+ synchronized (modFile) {
+ try {
+ originSize = modFile.getSize();
+ // delete data in sealed file
+ if (tsFileResource.isCompacting()) {
+ // we have to set modification offset to MAX_VALUE, as the
offset of source chunk may
+ // change after compaction
+ deletion.setFileOffset(Long.MAX_VALUE);
+ // write deletion into compaction modification file
+ tsFileResource.getCompactionModFile().write(deletion);
+ // write deletion into modification file to enable read during
compaction
+ modFile.write(deletion);
+ // remember to close mod file
+ tsFileResource.getCompactionModFile().close();
+ modFile.close();
+ } else {
+ deletion.setFileOffset(tsFileResource.getTsFileSize());
+ // write deletion into modification file
+ boolean modFileExists = modFile.exists();
+
+ modFile.write(deletion);
+
+ // remember to close mod file
+ modFile.close();
+
+ // if file length greater than 1M,execute compact.
+ modFile.compact();
+
+ if (!modFileExists) {
+ FileMetrics.getInstance().increaseModFileNum(1);
+ }
- if (!modFileExists) {
- FileMetrics.getInstance().increaseModFileNum(1);
+ // The file size may be smaller than the original file, so the
increment here may be
+ // negative
+ FileMetrics.getInstance().increaseModFileSize(modFile.getSize()
- originSize);
+ }
+ } catch (Throwable t) {
+ if (originSize != -1) {
+ modFile.truncate(originSize);
+ }
+ throw t;
}
-
- // The file size may be smaller than the original file, so the
increment here may be
- // negative
- FileMetrics.getInstance().increaseModFileSize(modFile.getSize() -
originSize);
+ logger.info(
+ "[Deletion] Deletion with path:{}, time:{}-{} written into mods
file:{}.",
+ deletion.getPath(),
+ deletion.getStartTime(),
+ deletion.getEndTime(),
+ modFile.getFilePath());
}
- logger.info(
- "[Deletion] Deletion with path:{}, time:{}-{} written into mods
file:{}.",
- deletion.getPath(),
- deletion.getStartTime(),
- deletion.getEndTime(),
- modFile.getFilePath());
} else {
// delete data in memory of unsealed file
tsFileResource.getProcessor().deleteDataInMemory(deletion,
devicePaths);
}
-
- // add a record in case of rollback
- updatedModFiles.add(modFile);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
index 609975aec7c..6491f4c4eaa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
@@ -29,6 +29,8 @@ import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.concurrent.GuardedBy;
+
import java.io.File;
import java.io.IOException;
import java.nio.file.FileAlreadyExistsException;
@@ -53,10 +55,11 @@ public class ModificationFile implements AutoCloseable {
public static final String COMPACT_SUFFIX = ".settle";
public static final String COMPACTION_FILE_SUFFIX = ".compaction.mods";
- // lazy loaded, set null when closed
- private List<Modification> modifications;
- private ModificationWriter writer;
- private ModificationReader reader;
+ // whether to verify the last line, it may be incomplete in extreme cases
+ private boolean needVerify = true;
+
+ private final ModificationWriter writer;
+ private final ModificationReader reader;
private String filePath;
private final SecureRandom random = new SecureRandom();
@@ -75,33 +78,11 @@ public class ModificationFile implements AutoCloseable {
this.filePath = filePath;
}
- private void init() {
- synchronized (this) {
- modifications = (List<Modification>) reader.read();
- }
- }
-
- private void checkInit() {
- if (modifications == null) {
- init();
- }
- }
-
/** Release resources such as streams and caches. */
@Override
public void close() throws IOException {
synchronized (this) {
writer.close();
- modifications = null;
- }
- }
-
- public void abort() throws IOException {
- synchronized (this) {
- writer.abort();
- if (modifications != null && !modifications.isEmpty()) {
- modifications.remove(modifications.size() - 1);
- }
}
}
@@ -114,13 +95,19 @@ public class ModificationFile implements AutoCloseable {
*/
public void write(Modification mod) throws IOException {
synchronized (this) {
- writer.write(mod);
- if (modifications != null) {
- modifications.add(mod);
+ if (needVerify && new File(filePath).exists()) {
+ writer.mayTruncateLastLine();
+ needVerify = false;
}
+ writer.write(mod);
}
}
+ @GuardedBy("TsFileResource-WriteLock")
+ public void truncate(long size) {
+ writer.truncate(size);
+ }
+
/**
* Get all modifications stored in this file.
*
@@ -128,11 +115,14 @@ public class ModificationFile implements AutoCloseable {
*/
public Collection<Modification> getModifications() {
synchronized (this) {
- checkInit();
- return new ArrayList<>(modifications);
+ return reader.read();
}
}
+ public Iterable<Modification> getModificationsIter() {
+ return reader::getModificationIterator;
+ }
+
public String getFilePath() {
return filePath;
}
@@ -228,7 +218,6 @@ public class ModificationFile implements AutoCloseable {
Files.move(new File(newModsFileName).toPath(), new
File(filePath).toPath());
logger.info("{} settle successful", filePath);
- updateModifications(allSettledModifications);
if (getSize() > COMPACT_THRESHOLD) {
logger.warn(
"After the mod file is settled, the file size is still greater
than 1M,the size of the file before settle is {},after settled the file size is
{}",
@@ -276,10 +265,4 @@ public class ModificationFile implements AutoCloseable {
}
return result;
}
-
- public void updateModifications(List<Modification> modifications) {
- synchronized (this) {
- this.modifications = modifications;
- }
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessor.java
index 6e94a24b120..242c6fd21e9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessor.java
@@ -21,24 +21,28 @@ package
org.apache.iotdb.db.storageengine.dataregion.modification.io;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
-import
org.apache.iotdb.db.storageengine.dataregion.modification.utils.TracedBufferedReader;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
+import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
+import java.util.NoSuchElementException;
/**
* LocalTextModificationAccessor uses a file on local file system to store the
modifications in text
@@ -49,9 +53,10 @@ public class LocalTextModificationAccessor
private static final Logger logger =
LoggerFactory.getLogger(LocalTextModificationAccessor.class);
private static final String SEPARATOR = ",";
- private static final String ABORT_MARK = "aborted";
+ private static final String NO_MODIFICATION_MSG =
+ "No modification has been written to this file[{}]";
- private String filePath;
+ private final String filePath;
private BufferedWriter writer;
/**
@@ -65,44 +70,82 @@ public class LocalTextModificationAccessor
@Override
public Collection<Modification> read() {
+ List<Modification> result = new ArrayList<>();
+ Iterator<Modification> iterator = getModificationIterator();
+ while (iterator.hasNext()) {
+ result.add(iterator.next());
+ }
+ return result;
+ }
+
+ // we need to hold the reader for the Iterator, cannot use auto close or
close in finally block
+ @SuppressWarnings("java:S2095")
+ @Override
+ public Iterator<Modification> getModificationIterator() {
File file = FSFactoryProducer.getFSFactory().getFile(filePath);
- if (!file.exists()) {
- logger.debug("No modification has been written to this file");
- return new ArrayList<>();
- }
-
- String line;
- long truncatedSize = 0;
- boolean crashed = false;
- List<Modification> modificationList = new ArrayList<>();
- try (TracedBufferedReader reader = new TracedBufferedReader(new
FileReader(file))) {
- while ((line = reader.readLine()) != null) {
- if (line.equals(ABORT_MARK) && !modificationList.isEmpty()) {
- modificationList.remove(modificationList.size() - 1);
- } else {
- modificationList.add(decodeModification(line));
+ final BufferedReader reader;
+ try {
+ reader = new BufferedReader(new FileReader(file));
+ } catch (FileNotFoundException e) {
+ logger.debug(NO_MODIFICATION_MSG, file);
+
+ // return empty iterator
+ return new Iterator<Modification>() {
+ @Override
+ public boolean hasNext() {
+ return false;
}
- truncatedSize = reader.position();
+
+ @Override
+ public Modification next() {
+ throw new NoSuchElementException();
+ }
+ };
+ }
+
+ final Modification[] cachedModification = new Modification[1];
+ return new Iterator<Modification>() {
+ @Override
+ public boolean hasNext() {
+ try {
+ if (cachedModification[0] == null) {
+ String line = reader.readLine();
+ if (line == null) {
+ reader.close();
+ return false;
+ } else {
+ return decodeModificationAndCache(reader, cachedModification,
line);
+ }
+ }
+ } catch (IOException e) {
+ logger.warn("An error occurred when reading modifications", e);
+ }
+ return true;
}
- } catch (IOException e) {
- crashed = true;
- logger.error(
- "An error occurred when reading modifications, and the remaining
modifications will be truncated to size {}.",
- truncatedSize,
- e);
- }
-
- if (crashed) {
- try (FileOutputStream outputStream = new FileOutputStream(file, true)) {
- outputStream.getChannel().truncate(truncatedSize);
- } catch (FileNotFoundException e) {
- logger.debug("No modification has been written to this file");
- } catch (IOException e) {
- logger.error(
- "An error occurred when truncating modifications to size {}.",
truncatedSize, e);
+
+ @Override
+ public Modification next() {
+ if (cachedModification[0] == null) {
+ throw new NoSuchElementException();
+ }
+ Modification result = cachedModification[0];
+ cachedModification[0] = null;
+ return result;
}
+ };
+ }
+
+ private boolean decodeModificationAndCache(
+ BufferedReader reader, Modification[] cachedModification, String line)
throws IOException {
+ try {
+ cachedModification[0] = decodeModification(line);
+ return true;
+ } catch (IOException e) {
+ logger.warn("An error occurred when decode line-[{}] to modification",
line);
+ cachedModification[0] = null;
+ reader.close();
+ return false;
}
- return modificationList;
}
@Override
@@ -114,23 +157,71 @@ public class LocalTextModificationAccessor
}
@Override
- public void abort() throws IOException {
+ public void write(Modification mod) throws IOException {
if (writer == null) {
writer = FSFactoryProducer.getFSFactory().getBufferedWriter(filePath,
true);
}
- writer.write(ABORT_MARK);
+ writer.write(encodeModification(mod));
writer.newLine();
writer.flush();
}
- @Override
- public void write(Modification mod) throws IOException {
+ @TestOnly
+ public void writeInComplete(Modification mod) throws IOException {
if (writer == null) {
writer = FSFactoryProducer.getFSFactory().getBufferedWriter(filePath,
true);
}
- writer.write(encodeModification(mod));
- writer.newLine();
- writer.flush();
+ String line = encodeModification(mod);
+ if (line != null) {
+ writer.write(line.substring(0, 2));
+ }
+ }
+
+ @TestOnly
+ public void writeMeetException(Modification mod) throws IOException {
+ if (writer == null) {
+ writer = FSFactoryProducer.getFSFactory().getBufferedWriter(filePath,
true);
+ }
+ writeInComplete(mod);
+ throw new IOException();
+ }
+
+ @Override
+ public void truncate(long size) {
+ try (FileOutputStream outputStream =
+ new
FileOutputStream(FSFactoryProducer.getFSFactory().getFile(filePath), true)) {
+ outputStream.getChannel().truncate(size);
+ logger.warn("The modifications[{}] will be truncated to size {}.",
filePath, size);
+ } catch (FileNotFoundException e) {
+ logger.debug(NO_MODIFICATION_MSG, filePath);
+ } catch (IOException e) {
+ logger.error(
+ "An error occurred when truncating modifications[{}] to size {}.",
filePath, size, e);
+ }
+ }
+
+ @Override
+ public void mayTruncateLastLine() {
+ try (RandomAccessFile file = new RandomAccessFile(filePath, "r")) {
+ long filePointer = file.length() - 1;
+ if (filePointer == 0) {
+ return;
+ }
+
+ file.seek(filePointer);
+ byte lastChar = file.readByte();
+ if (lastChar != '\n') {
+ while (filePointer > -1 && lastChar != '\n') {
+ file.seek(filePointer);
+ filePointer--;
+ lastChar = file.readByte();
+ }
+ logger.warn("The last line of Mods is incomplete, will be truncated");
+ truncate(filePointer + 2);
+ }
+ } catch (IOException e) {
+ logger.error("An error occurred when reading modifications", e);
+ }
}
private static String encodeModification(Modification mod) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationReader.java
index a10b9ca7161..2da82b13d34 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationReader.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
import java.io.IOException;
import java.util.Collection;
+import java.util.Iterator;
/** ModificationReader reads all modifications from a persistent medium like
file system. */
public interface ModificationReader {
@@ -35,6 +36,14 @@ public interface ModificationReader {
*/
Collection<Modification> read();
+ /**
+ * Get an iterator over this mod file, others keep consistence with {@link
#read()}. Please ensure
+ * you have called hasNext() with return of {@code true} before calling
next().
+ *
+ * @return the modification iterator.
+ */
+ Iterator<Modification> getModificationIterator();
+
/** Release resources like streams. */
void close() throws IOException;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationWriter.java
index b59decbcd21..d2213d7d8ca 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationWriter.java
@@ -37,9 +37,10 @@ public interface ModificationWriter {
*/
void write(Modification mod) throws IOException;
+ void truncate(long size);
+
+ void mayTruncateLastLine();
+
/** Release resources like streams. */
void close() throws IOException;
-
- /** Abort last modification. Notice that after calling abort(), a fileWriter
is opened. */
- void abort() throws IOException;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/utils/TracedBufferedReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/utils/TracedBufferedReader.java
deleted file mode 100644
index 9394a0cfeff..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/utils/TracedBufferedReader.java
+++ /dev/null
@@ -1,463 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.storageengine.dataregion.modification.utils;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.Reader;
-import java.io.UncheckedIOException;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.Spliterator;
-import java.util.Spliterators;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
-
-/**
- * Copied from {@link java.io.BufferedReader}, trace the read position by
modifying the fill()
- * method.
- */
-public class TracedBufferedReader extends Reader {
- private Reader in;
-
- private char[] cb;
- private int nChars;
- private int nextChar;
-
- private static final int INVALIDATED = -2;
- private static final int UNMARKED = -1;
- private int markedChar = UNMARKED;
- private int readAheadLimit = 0; /* Valid only when markedChar > 0 */
-
- /** If the next character is a line feed, skip it. */
- private boolean skipLF = false;
-
- /** The skipLF flag when the mark was set. */
- private boolean markedSkipLF = false;
-
- private static final int DEFAULT_CHAR_BUFFER_SIZE = 8192;
- private static final int DEFAULT_EXPECTED_LINE_LENGTH = 80;
-
- /** the total bytes number already filled into cb. */
- private long totalFilledBytesNum = 0;
-
- /**
- * Creates a buffering character-input stream that uses an input buffer of
the specified size.
- *
- * @param in A Reader
- * @param sz Input-buffer size
- * @exception IllegalArgumentException If {@code sz <= 0}
- */
- public TracedBufferedReader(Reader in, int sz) {
- super(in);
- if (sz <= 0) {
- throw new IllegalArgumentException("Buffer size <= 0");
- }
- this.in = in;
- cb = new char[sz];
- nextChar = nChars = 0;
- }
-
- /**
- * Creates a buffering character-input stream that uses a default-sized
input buffer.
- *
- * @param in A Reader
- */
- public TracedBufferedReader(Reader in) {
- this(in, DEFAULT_CHAR_BUFFER_SIZE);
- }
-
- /** Checks to make sure that the stream has not been closed. */
- private void ensureOpen() throws IOException {
- if (in == null) {
- throw new IOException("Stream closed");
- }
- }
-
- /** {@link BufferedReader#fill()} */
- private void fill() throws IOException {
- int dst;
- if (markedChar <= UNMARKED) {
- /* No mark */
- dst = 0;
- } else {
- /* Marked */
- int delta = nextChar - markedChar;
- if (delta >= readAheadLimit) {
- /* Gone past read-ahead limit: Invalidate mark. */
- markedChar = INVALIDATED;
- readAheadLimit = 0;
- dst = 0;
- } else {
- if (readAheadLimit <= cb.length) {
- /* Shuffle in the current buffer. */
- System.arraycopy(cb, markedChar, cb, 0, delta);
- markedChar = 0;
- dst = delta;
- } else {
- /* Reallocate buffer to accommodate read-ahead limit. */
- char[] ncb = new char[readAheadLimit];
- System.arraycopy(cb, markedChar, ncb, 0, delta);
- cb = ncb;
- markedChar = 0;
- dst = delta;
- }
- nextChar = nChars = delta;
- }
- }
-
- int n;
- do {
- n = in.read(cb, dst, cb.length - dst);
- } while (n == 0);
- if (n > 0) {
- nChars = dst + n;
- nextChar = dst;
- totalFilledBytesNum = totalFilledBytesNum + n;
- }
- }
-
- /** {@link BufferedReader#read()} */
- @Override
- public int read() throws IOException {
- synchronized (lock) {
- ensureOpen();
- for (; ; ) {
- if (nextChar >= nChars) {
- fill();
- if (nextChar >= nChars) {
- return -1;
- }
- }
- if (skipLF) {
- skipLF = false;
- if (cb[nextChar] == '\n') {
- nextChar++;
- continue;
- }
- }
- return cb[nextChar++];
- }
- }
- }
-
- /** {@link BufferedReader#read1(char[], int, int)} */
- private int read1(char[] cbuf, int off, int len) throws IOException {
- if (nextChar >= nChars) {
- /* If the requested length is at least as large as the buffer, and
- if there is no mark/reset activity, and if line feeds are not
- being skipped, do not bother to copy the characters into the
- local buffer. In this way buffered streams will cascade
- harmlessly. */
- if (len >= cb.length && markedChar <= UNMARKED && !skipLF) {
- return in.read(cbuf, off, len);
- }
- fill();
- }
- if (nextChar >= nChars) {
- return -1;
- }
- if (skipLF) {
- skipLF = false;
- if (cb[nextChar] == '\n') {
- nextChar++;
- if (nextChar >= nChars) {
- fill();
- }
- if (nextChar >= nChars) {
- return -1;
- }
- }
- }
- int n = Math.min(len, nChars - nextChar);
- System.arraycopy(cb, nextChar, cbuf, off, n);
- nextChar += n;
- return n;
- }
-
- /** {@link BufferedReader#read(char[], int, int)} */
- @Override
- public int read(char[] cbuf, int off, int len) throws IOException {
- synchronized (lock) {
- ensureOpen();
- if ((off < 0)
- || (off > cbuf.length)
- || (len < 0)
- || ((off + len) > cbuf.length)
- || ((off + len) < 0)) {
- throw new IndexOutOfBoundsException();
- } else if (len == 0) {
- return 0;
- }
-
- int n = read1(cbuf, off, len);
- if (n <= 0) {
- return n;
- }
- while ((n < len) && in.ready()) {
- int n1 = read1(cbuf, off + n, len - n);
- if (n1 <= 0) {
- break;
- }
- n += n1;
- }
- return n;
- }
- }
-
- /** {@link BufferedReader#readLine(boolean)} */
- String readLine(boolean ignoreLF) throws IOException {
- StringBuilder s = null;
- int startChar;
-
- synchronized (lock) {
- ensureOpen();
- boolean omitLF = ignoreLF || skipLF;
-
- bufferLoop:
- for (; ; ) {
-
- if (nextChar >= nChars) {
- fill();
- }
- if (nextChar >= nChars) {
- /* EOF */
- if (s != null && s.length() > 0) {
- return s.toString();
- } else {
- return null;
- }
- }
- boolean eol = false;
- char c = 0;
- int i;
-
- /* Skip a leftover '\n', if necessary. */
- if (omitLF && (cb[nextChar] == '\n')) {
- nextChar++;
- }
- skipLF = false;
- omitLF = false;
-
- charLoop:
- for (i = nextChar; i < nChars; i++) {
- c = cb[i];
- if ((c == '\n') || (c == '\r')) {
- eol = true;
- break charLoop;
- }
- }
-
- startChar = nextChar;
- nextChar = i;
-
- if (eol) {
- String str;
- if (s == null) {
- str = new String(cb, startChar, i - startChar);
- } else {
- s.append(cb, startChar, i - startChar);
- str = s.toString();
- }
- nextChar++;
- if (c == '\r') {
- skipLF = true;
- if (read() != -1) {
- nextChar--;
- }
- }
- return str;
- }
-
- if (s == null) {
- s = new StringBuilder(DEFAULT_EXPECTED_LINE_LENGTH);
- }
- s.append(cb, startChar, i - startChar);
- }
- }
- }
-
- /** {@link BufferedReader#readLine()} */
- public String readLine() throws IOException {
- return readLine(false);
- }
-
- /** {@link BufferedReader#skip(long)} */
- @Override
- public long skip(long n) throws IOException {
- if (n < 0L) {
- throw new IllegalArgumentException("skip value is negative");
- }
- synchronized (lock) {
- ensureOpen();
- long r = n;
- while (r > 0) {
- if (nextChar >= nChars) {
- fill();
- }
- if (nextChar >= nChars) {
- /* EOF */
- break;
- }
- if (skipLF) {
- skipLF = false;
- if (cb[nextChar] == '\n') {
- nextChar++;
- }
- }
- long d = (long) nChars - nextChar;
- if (r <= d) {
- nextChar += r;
- r = 0;
- break;
- } else {
- r -= d;
- nextChar = nChars;
- }
- }
- return n - r;
- }
- }
-
- /** {@link BufferedReader#ready()} */
- @Override
- public boolean ready() throws IOException {
- synchronized (lock) {
- ensureOpen();
-
- /*
- * If newline needs to be skipped and the next char to be read
- * is a newline character, then just skip it right away.
- */
- if (skipLF) {
- /* Note that in.ready() will return true if and only if the next
- * read on the stream will not block.
- */
- if (nextChar >= nChars && in.ready()) {
- fill();
- }
- if (nextChar < nChars) {
- if (cb[nextChar] == '\n') {
- nextChar++;
- }
- skipLF = false;
- }
- }
- return (nextChar < nChars) || in.ready();
- }
- }
-
- /** {@link BufferedReader#markSupported()} */
- @Override
- public boolean markSupported() {
- return true;
- }
-
- /** {@link BufferedReader#mark(int)} */
- @Override
- public void mark(int readAheadLimit) throws IOException {
- if (readAheadLimit < 0) {
- throw new IllegalArgumentException("Read-ahead limit < 0");
- }
- synchronized (lock) {
- ensureOpen();
- this.readAheadLimit = readAheadLimit;
- markedChar = nextChar;
- markedSkipLF = skipLF;
- }
- }
-
- /** {@link BufferedReader#reset()} */
- @Override
- public void reset() throws IOException {
- synchronized (lock) {
- ensureOpen();
- if (markedChar < 0) {
- throw new IOException((markedChar == INVALIDATED) ? "Mark invalid" :
"Stream not marked");
- }
- nextChar = markedChar;
- skipLF = markedSkipLF;
- }
- }
-
- /** {@link BufferedReader#close()} */
- @Override
- public void close() throws IOException {
- synchronized (lock) {
- if (in == null) {
- return;
- }
- try {
- in.close();
- } finally {
- in = null;
- cb = null;
- }
- }
- }
-
- /** {@link BufferedReader#lines()} */
- public Stream<String> lines() {
- Iterator<String> iter =
- new Iterator<String>() {
- String nextLine = null;
-
- @Override
- public boolean hasNext() {
- if (nextLine != null) {
- return true;
- } else {
- try {
- nextLine = readLine();
- return (nextLine != null);
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
- }
-
- @Override
- public String next() {
- if (nextLine != null || hasNext()) {
- String line = nextLine;
- nextLine = null;
- return line;
- } else {
- throw new NoSuchElementException();
- }
- }
- };
- return StreamSupport.stream(
- Spliterators.spliteratorUnknownSize(iter, Spliterator.ORDERED |
Spliterator.NONNULL),
- false);
- }
-
- /**
- * Returns this reader's file position.
- *
- * @return This reader's file position, a non-negative integer counting the
number of bytes from
- * the beginning of the file to the current position
- */
- public long position() {
- // position = totalFilledBytesNum - lastFilledBytesNum +
readOffsetInLastFilledBytes
- // lastFilledBytesNum = nChars - dst, readOffsetInLastFilledBytes =
nextChar - dst
- return totalFilledBytesNum - nChars + nextChar;
- }
-}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFileTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFileTest.java
index a6bbe3e226e..ad6625b2bc3 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFileTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFileTest.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.db.storageengine.dataregion.modification;
import org.apache.iotdb.commons.path.PartialPath;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.recover.CompactionRecoverManager;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.io.LocalTextModificationAccessor;
import org.apache.iotdb.db.utils.constant.TestConstant;
import org.junit.Assert;
@@ -72,33 +73,25 @@ public class ModificationFileTest {
}
@Test
- public void testAbort() {
+ public void writeVerifyTest() {
String tempFileName = TestConstant.BASE_OUTPUT_PATH.concat("mod.temp");
Modification[] modifications =
new Modification[] {
new Deletion(new PartialPath(new String[] {"d1", "s1"}), 1, 1),
- new Deletion(new PartialPath(new String[] {"d1", "s2"}), 2, 2),
- new Deletion(new PartialPath(new String[] {"d1", "s3"}), 3, 3, 4),
- new Deletion(new PartialPath(new String[] {"d1", "s4"}), 4, 4, 5),
+ new Deletion(new PartialPath(new String[] {"d1", "s2"}), 2, 2)
};
try (ModificationFile mFile = new ModificationFile(tempFileName)) {
- for (int i = 0; i < 2; i++) {
- mFile.write(modifications[i]);
+ mFile.write(modifications[0]);
+ try (LocalTextModificationAccessor accessor =
+ new LocalTextModificationAccessor(tempFileName)) {
+ accessor.writeInComplete(modifications[1]);
}
+ mFile.write(modifications[1]);
List<Modification> modificationList = (List<Modification>)
mFile.getModifications();
+ assertEquals(2, modificationList.size());
for (int i = 0; i < 2; i++) {
assertEquals(modifications[i], modificationList.get(i));
}
-
- for (int i = 2; i < 4; i++) {
- mFile.write(modifications[i]);
- }
- modificationList = (List<Modification>) mFile.getModifications();
- mFile.abort();
-
- for (int i = 0; i < 3; i++) {
- assertEquals(modifications[i], modificationList.get(i));
- }
} catch (IOException e) {
fail(e.getMessage());
} finally {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessorTest.java
index 136531eb238..e40f3c7bcc1 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessorTest.java
@@ -24,11 +24,10 @@ import
org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
import org.apache.iotdb.db.utils.constant.TestConstant;
+import org.junit.AfterClass;
import org.junit.Test;
-import java.io.BufferedWriter;
import java.io.File;
-import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -38,17 +37,53 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class LocalTextModificationAccessorTest {
+ private static Modification[] modifications =
+ new Modification[] {
+ new Deletion(new PartialPath(new String[] {"d1", "s1"}), 1, 1),
+ new Deletion(new PartialPath(new String[] {"d1", "s2"}), 2, 2),
+ new Deletion(new PartialPath(new String[] {"d1", "s3"}), 3, 3),
+ new Deletion(new PartialPath(new String[] {"d1", "s4"}), 4, 4),
+ };
+
+ @AfterClass
+ public static void tearDown() {
+ modifications = null;
+ }
+
+ @Test
+ public void writeMeetException() throws IOException {
+ String tempFileName = TestConstant.BASE_OUTPUT_PATH.concat("mod.temp");
+ long length = 0;
+ LocalTextModificationAccessor accessor = null;
+ try {
+ accessor = new LocalTextModificationAccessor(tempFileName);
+ for (int i = 0; i < 2; i++) {
+ accessor.write(modifications[i]);
+ }
+ length = new File(tempFileName).length();
+ // the current line should be truncated when meet exception
+ accessor.writeMeetException(modifications[2]);
+ for (int i = 2; i < 4; i++) {
+ accessor.write(modifications[i]);
+ }
+ List<Modification> modificationList = (List<Modification>)
accessor.read();
+ assertEquals(4, modificationList.size());
+ for (int i = 0; i < 4; i++) {
+ assertEquals(modifications[i], modificationList.get(i));
+ }
+ } catch (IOException e) {
+ accessor.truncate(length);
+ } finally {
+ if (accessor != null) {
+ accessor.close();
+ }
+ new File(tempFileName).delete();
+ }
+ }
@Test
public void readMyWrite() {
String tempFileName = TestConstant.BASE_OUTPUT_PATH.concat("mod.temp");
- Modification[] modifications =
- new Modification[] {
- new Deletion(new PartialPath(new String[] {"d1", "s1"}), 1, 1),
- new Deletion(new PartialPath(new String[] {"d1", "s2"}), 2, 2),
- new Deletion(new PartialPath(new String[] {"d1", "s3"}), 3, 3),
- new Deletion(new PartialPath(new String[] {"d1", "s4"}), 4, 4),
- };
try (LocalTextModificationAccessor accessor = new
LocalTextModificationAccessor(tempFileName)) {
for (int i = 0; i < 2; i++) {
accessor.write(modifications[i]);
@@ -75,52 +110,41 @@ public class LocalTextModificationAccessorTest {
@Test
public void readNull() {
String tempFileName = TestConstant.BASE_OUTPUT_PATH.concat("mod.temp");
- LocalTextModificationAccessor accessor = new
LocalTextModificationAccessor(tempFileName);
- new File(tempFileName).delete();
- Collection<Modification> modifications = accessor.read();
- assertEquals(new ArrayList<>(), modifications);
+ try (LocalTextModificationAccessor accessor = new
LocalTextModificationAccessor(tempFileName)) {
+ new File(tempFileName).delete();
+ Collection<Modification> modifications = accessor.read();
+ assertEquals(new ArrayList<>(), modifications);
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
}
@Test
- public void readAndTruncate() {
+ public void readMeetError() {
String tempFileName = TestConstant.BASE_OUTPUT_PATH.concat("mod.temp");
File file = new File(tempFileName);
if (file.exists()) {
file.delete();
}
- Modification[] modifications =
- new Modification[] {
- new Deletion(new PartialPath(new String[] {"d1", "s1"}), 1, 1),
- new Deletion(new PartialPath(new String[] {"d1", "s2"}), 2, 2),
- new Deletion(new PartialPath(new String[] {"d1", "s3"}), 3, 3),
- new Deletion(new PartialPath(new String[] {"d1", "s4"}), 4, 4),
- };
- try (LocalTextModificationAccessor accessor = new
LocalTextModificationAccessor(tempFileName);
- BufferedWriter writer = new BufferedWriter(new
FileWriter(tempFileName, true))) {
+ try (LocalTextModificationAccessor accessor = new
LocalTextModificationAccessor(tempFileName)) {
// write normal message
- for (int i = 0; i < 2; i++) {
+ for (int i = 0; i < 4; i++) {
accessor.write(modifications[i]);
}
List<Modification> modificationList = (List<Modification>)
accessor.read();
- for (int i = 0; i < 2; i++) {
+ for (int i = 0; i < 4; i++) {
assertEquals(modifications[i], modificationList.get(i));
}
// write error message
- long length = file.length();
- writer.write("error");
- writer.newLine();
- writer.flush();
- // write normal message & read
- for (int i = 2; i < 4; i++) {
- accessor.write(modifications[i]);
- }
+ accessor.writeInComplete(modifications[0]);
+
modificationList = (List<Modification>) accessor.read();
- for (int i = 0; i < 2; i++) {
+ // the error line is ignored
+ assertEquals(4, modificationList.size());
+ for (int i = 0; i < 4; i++) {
System.out.println(modificationList);
assertEquals(modifications[i], modificationList.get(i));
}
- // check truncated file
- assertEquals(length, file.length());
} catch (IOException e) {
fail(e.getMessage());
} finally {