This is an automated email from the ASF dual-hosted git repository.
rgoers pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/flume.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0c892eb Try to fix failing test
0c892eb is described below
commit 0c892ebc7c8d9bfa559b1204c331c525fecce6d0
Author: Ralph Goers <[email protected]>
AuthorDate: Sun Jan 30 00:16:35 2022 -0700
Try to fix failing test
---
.../avro/ReliableSpoolingFileEventReader.java | 21 +++++++++++++++++++++
.../avro/TestReliableSpoolingFileEventReader.java | 12 +++++++++++-
2 files changed, 32 insertions(+), 1 deletion(-)
diff --git
a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
index dde084e..41cfe96 100644
---
a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
+++
b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
@@ -53,6 +53,9 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -92,6 +95,7 @@ public class ReliableSpoolingFileEventReader implements
ReliableEventReader {
.getLogger(ReliableSpoolingFileEventReader.class);
static final String metaFileName = ".flumespool-main.meta";
+ private static final DateTimeFormatter FORMATTER =
DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS");
private final File spoolDirectory;
private final Path spoolDirPath;
private final String completedSuffix;
@@ -348,6 +352,23 @@ public class ReliableSpoolingFileEventReader implements
ReliableEventReader {
return lastFileRead.get().getFile().getAbsolutePath();
}
+ /**
+ * Return the filename, lastModified, and size which generated the data from
the last successful
+ * {@link #readEvents(int)} call. Returns null if called before any file
+ * contents are read.
+ */
+ public String getLastFileReadInfo() {
+ if (!lastFileRead.isPresent()) {
+ return null;
+ }
+ FileInfo fileInfo = lastFileRead.get();
+ StringBuilder sb = new StringBuilder(fileInfo.getFile().getName());
+ sb.append(" lastModified:
").append(FORMATTER.format(Instant.ofEpochMilli(fileInfo.lastModified)
+ .atZone(ZoneId.systemDefault()).toLocalDateTime()));
+ sb.append(" size: ").append(fileInfo.length);
+ return sb.toString();
+ }
+
// public interface
public Event readEvent() throws IOException {
List<Event> events = readEvents(1);
diff --git
a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
index cfd0a60..0d44824 100644
---
a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
+++
b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
@@ -75,7 +75,7 @@ public class TestReliableSpoolingFileEventReader {
if (!WORK_DIR.isDirectory()) {
Files.createParentDirs(new File(WORK_DIR, "dummy"));
}
-
+ long lastModified = 0;
// write out a few files
for (int i = 0; i < 4; i++) {
File fileName = new File(WORK_DIR, "file" + i);
@@ -86,6 +86,12 @@ public class TestReliableSpoolingFileEventReader {
sb.append("file" + i + "line" + j + "\n");
}
Files.write(sb.toString(), fileName, Charsets.UTF_8);
+ // Make sure all the files have the same timestamp.
+ if (lastModified == 0) {
+ lastModified = fileName.lastModified();
+ } else {
+ fileName.setLastModified(lastModified);
+ }
}
Thread.sleep(1500L); // make sure timestamp is later
Files.write("\n", new File(WORK_DIR, "emptylineFile"), Charsets.UTF_8);
@@ -699,6 +705,10 @@ public class TestReliableSpoolingFileEventReader {
for (int i = 0; i < listFiles(dir).size(); i++) {
events = reader.readEvents(10);
for (Event e : events) {
+ if (reader instanceof ReliableSpoolingFileEventReader) {
+ logger.debug("Adding event for file {} with body \"{}\"",
+ ((ReliableSpoolingFileEventReader)
reader).getLastFileReadInfo(), new String(e.getBody()));
+ }
actual.add(new String(e.getBody()));
}
reader.commit();