This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new aadeaa0d39d YARN-11916. FileSystemTimelineReaderImpl vulnerable to
race conditions (#8164)
aadeaa0d39d is described below
commit aadeaa0d39d3422d5cf74e3af9f6b7317f11c9d4
Author: Steve Loughran <[email protected]>
AuthorDate: Tue Jan 6 19:25:59 2026 +0000
YARN-11916. FileSystemTimelineReaderImpl vulnerable to race conditions
(#8164)
Move the class. It was never public API.
added the test artifact at test scope to yarn-client for its tests.
---
.../hadoop-yarn/hadoop-yarn-client/pom.xml | 6 +
.../timelineservice/storage/TimelineWriter.java | 4 +-
.../storage/FileSystemTimelineReaderImpl.java | 10 +-
.../storage/FileSystemTimelineWriterImpl.java | 101 ++++++++--
.../storage/TestFileSystemTimelineWriterImpl.java | 205 ++++++++++++---------
5 files changed, 229 insertions(+), 97 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
index 6e7f9d243d2..904b76f452e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
@@ -158,6 +158,12 @@
<scope>test</scope>
<type>test-jar</type>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-timelineservice</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
index ccc74910377..d6d7ec3d922 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
@@ -73,6 +73,8 @@ TimelineWriteResponse write(TimelineCollectorContext context,
*
* Any errors occurring for individual write request objects will be reported
* in the response.
+ *<p>
+ * This is not invoked anywhere, tested and all implementations return null.
*
* @param data
* a {@link TimelineEntity} object
@@ -80,7 +82,7 @@ TimelineWriteResponse write(TimelineCollectorContext context,
* value.
* @param track Specifies the track or dimension along which aggregation
would
* occur. Includes USER, FLOW, QUEUE, etc.
- * @return a {@link TimelineWriteResponse} object.
+ * @return a {@link TimelineWriteResponse} object. All implementations
return null.
* @throws IOException if there is any exception encountered while
aggregating
* entities to the backend storage.
*/
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
similarity index 97%
rename from
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
rename to
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
index 2e771fc77e8..d9bee2db93f 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
@@ -39,6 +39,8 @@
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -61,12 +63,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static
org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl.escape;
+
/**
* File System based implementation for TimelineReader. This implementation
may
* not provide a complete implementation of all the necessary features. This
* implementation is provided solely for basic testing purposes, and should
not
* be used in a non-test situation.
*/
[email protected]
[email protected]
public class FileSystemTimelineReaderImpl extends AbstractService
implements TimelineReader {
@@ -164,7 +170,9 @@ private static void fillFields(TimelineEntity finalEntity,
private String getFlowRunPath(String userId, String clusterId,
String flowName, Long flowRunId, String appId) throws IOException {
if (userId != null && flowName != null && flowRunId != null) {
- return userId + File.separator + flowName + File.separator + "*" +
File.separator + flowRunId;
+ return escape(userId) + File.separator
+ + escape(flowName) + File.separator
+ + "*" + File.separator + flowRunId;
}
if (clusterId == null || appId == null) {
throw new IOException("Unable to get flow info");
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
similarity index 75%
rename from
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
rename to
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
index e658aad1eaa..82d15de380d 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.store.LogExactlyOnce;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
@@ -51,7 +52,27 @@
* This implements a FileSystem based backend for storing application timeline
* information. This implementation may not provide a complete implementation
of
* all the necessary features. This implementation is provided solely for basic
- * testing purposes, and should not be used in a non-test situation.
+ * testing purposes, and MUST NOT be used in a non-test situation.
+ * <p>
+ * Key limitations are:
+ * <ol>
+ * <li>Inadequate scalability and concurrency for production use</li>
+ * <li>Weak security: any authenticated caller can add events to any
application
+ * timeline.</li>
+ * </ol>
+ * <p>
+ * To implement an atomic append it reads all the data in the original file,
+ * writes that to a temporary file, appends the new
+ * data there and renames that temporary file to the original path.
+ * This makes the update operation slower and slower the longer an application
runs.
+ * If any other update comes in while an existing update is in progress,
+ * it will read and append to the previous state of the log, losing all changes
+ * from the ongoing transaction.
+ * <p>
+ * This is not a database. Apache HBase is. Use it.
+ * <p>
+ * The only realistic justification for this is if you are writing code which
updates
+ * the timeline service and you want something easier to debug in unit tests.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
@@ -89,8 +110,16 @@ public class FileSystemTimelineWriterImpl extends
AbstractService
private static final Logger LOG =
LoggerFactory.getLogger(FileSystemTimelineWriter.class);
+ private static final Logger LOGIMPL =
+ LoggerFactory.getLogger(FileSystemTimelineWriterImpl.class);
+
+ public static final LogExactlyOnce WARNING_OF_USE =
+ new LogExactlyOnce(LOGIMPL);
+
FileSystemTimelineWriterImpl() {
super((FileSystemTimelineWriterImpl.class.getName()));
+ WARNING_OF_USE.warn("This timeline writer is neither safe nor scaleable
enough to"
+ + " be used in production.");
}
@Override
@@ -126,21 +155,19 @@ private synchronized void writeInternal(String clusterId,
String userId,
TimelineEntity entity,
TimelineWriteResponse response)
throws IOException {
- String entityTypePathStr = clusterId + File.separator + userId +
- File.separator + escape(flowName) + File.separator +
- escape(flowVersion) + File.separator + flowRun + File.separator + appId
- + File.separator + entity.getType();
+ final String entityTypePathStr =
+ buildEntityTypeSubpath(clusterId, userId, flowName, flowVersion,
flowRun, appId, entity.getType());
Path entityTypePath = new Path(entitiesPath, entityTypePathStr);
try {
mkdirs(entityTypePath);
Path filePath =
new Path(entityTypePath,
- entity.getId() + TIMELINE_SERVICE_STORAGE_EXTENSION);
+ escape(entity.getId(), "id") +
TIMELINE_SERVICE_STORAGE_EXTENSION);
createFileWithRetries(filePath);
- byte[] record = new StringBuilder()
- .append(TimelineUtils.dumpTimelineRecordtoJSON(entity))
- .append("\n").toString().getBytes(StandardCharsets.UTF_8);
+ byte[] record =
+ (TimelineUtils.dumpTimelineRecordtoJSON(entity) + "\n")
+ .getBytes(StandardCharsets.UTF_8);
writeFileWithRetries(filePath, record);
} catch (Exception ioe) {
LOG.warn("Interrupted operation:{}", ioe.getMessage());
@@ -153,6 +180,35 @@ private synchronized void writeInternal(String clusterId,
String userId,
}
}
+ /**
+ * Given the various attributes of an entity, return the string subpath
+ * of the directory.
+ * @param clusterId cluster ID
+ * @param userId user ID
+ * @param flowName flow name
+ * @param flowVersion flow version
+ * @param flowRun flow run
+ * @param appId application ID
+ * @param type entity type
+ * @return the subpath for records.
+ */
+ @VisibleForTesting
+ public static String buildEntityTypeSubpath(final String clusterId,
+ final String userId,
+ final String flowName,
+ final String flowVersion,
+ final long flowRun,
+ final String appId,
+ final String type) {
+ return clusterId
+ + File.separator + userId
+ + File.separator + escape(flowName, "")
+ + File.separator + escape(flowVersion, "")
+ + File.separator + flowRun
+ + File.separator + escape(appId, "")
+ + File.separator + escape(type, "type");
+ }
+
private TimelineWriteError createTimelineWriteError(TimelineEntity entity) {
TimelineWriteError error = new TimelineWriteError();
error.setEntityId(entity.getId());
@@ -316,8 +372,29 @@ protected void writeFile(Path outputPath, byte[] data)
throws IOException {
}
}
- // specifically escape the separator character
- private static String escape(String str) {
- return str.replace(File.separatorChar, '_');
+ /**
+ * Escape filesystem separator character and other URL-unfriendly chars.
+ * @param str input string
+ * @return a string with none of the escaped characters.
+ */
+ @VisibleForTesting
+ public static String escape(String str) {
+ return escape(str, "");
+ }
+
+ /**
+ * Escape filesystem separator character and other URL-unfriendly chars.
+ * Empty strings are mapped to a fallback string, which may itself be empty.
+ * @param str input string
+ * @param fallback fallback char
+ * @return a string with none of the escaped characters.
+ */
+ @VisibleForTesting
+ public static String escape(String str, final String fallback) {
+ return str.isEmpty()
+ ? fallback
+ : str.replace(File.separatorChar, '_')
+ .replace('?', '_')
+ .replace(':', '_');
}
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
index efed104eeea..8297f17e4b1 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
@@ -27,14 +27,17 @@
import java.util.List;
import java.util.Map;
+import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.AbstractHadoopTestBase;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
@@ -43,11 +46,16 @@
import
org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
+import static
org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl.buildEntityTypeSubpath;
+import static
org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl.escape;
+
+public class TestFileSystemTimelineWriterImpl extends AbstractHadoopTestBase {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestFileSystemTimelineWriterImpl.class);
+
+ public static final String UP = ".." + File.separator;
-public class TestFileSystemTimelineWriterImpl {
@TempDir
private File tmpFolder;
@@ -84,12 +92,10 @@ void testWriteEntityToFile() throws Exception {
te.addEntity(entity2);
Map<String, TimelineMetric> aggregatedMetrics =
- new HashMap<String, TimelineMetric>();
+ new HashMap<>();
aggregatedMetrics.put(metricId, metric);
- FileSystemTimelineWriterImpl fsi = null;
- try {
- fsi = new FileSystemTimelineWriterImpl();
+ try (FileSystemTimelineWriterImpl fsi = new
FileSystemTimelineWriterImpl()) {
Configuration conf = new YarnConfiguration();
String outputRoot = tmpFolder.getAbsolutePath();
conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
@@ -105,47 +111,43 @@ void testWriteEntityToFile() throws Exception {
File.separator + "cluster_id" + File.separator + "user_id" +
File.separator + "flow_name" + File.separator + "flow_version" +
File.separator + "12345678" + File.separator + "app_id" +
- File.separator + type + File.separator + id +
+ File.separator + type
+ + File.separator + id +
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
- Path path = new Path(fileName);
- FileSystem fs = FileSystem.get(conf);
- assertTrue(fs.exists(path),
- "Specified path(" + fileName + ") should exist: ");
- FileStatus fileStatus = fs.getFileStatus(path);
- assertFalse(fileStatus.isDirectory(), "Specified path should be a file");
- List<String> data = readFromFile(fs, path);
+ List<String> data = readFromFile(FileSystem.get(conf), new
Path(fileName), 2);
// ensure there's only one entity + 1 new line
- assertEquals(2, data.size(), "data size is:" + data.size());
- String d = data.get(0);
+ Assertions.assertThat(data).hasSize(2);
// confirm the contents same as what was written
- assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity));
+ assertRecordMatches(data.get(0), entity);
// verify aggregated metrics
String fileName2 = fsi.getOutputRoot() + File.separator + "entities" +
File.separator + "cluster_id" + File.separator + "user_id" +
File.separator + "flow_name" + File.separator + "flow_version" +
File.separator + "12345678" + File.separator + "app_id" +
- File.separator + type2 + File.separator + id2 +
+ File.separator + type2
+ + File.separator + id2 +
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
Path path2 = new Path(fileName2);
- assertTrue(fs.exists(path2),
- "Specified path(" + fileName + ") should exist: ");
- FileStatus fileStatus2 = fs.getFileStatus(path2);
- assertFalse(fileStatus2.isDirectory(), "Specified path should be a
file");
- List<String> data2 = readFromFile(fs, path2);
+ List<String> data2 = readFromFile(FileSystem.get(conf), path2, 2);
// ensure there's only one entity + 1 new line
- assertEquals(2, data2.size(), "data size is:" + data2.size());
- String metricToString = data2.get(0);
+ Assertions.assertThat(data).hasSize(2);
// confirm the contents same as what was written
- assertEquals(metricToString,
- TimelineUtils.dumpTimelineRecordtoJSON(entity2));
- } finally {
- if (fsi != null) {
- fsi.close();
- }
+ assertRecordMatches(data2.get(0), entity2);
}
}
+ /**
+ * Assert a read in string matches the json value of the entity
+ * @param d record
+ * @param entity expected
+ */
+ private static void assertRecordMatches(final String d, final TimelineEntity
entity)
+ throws IOException {
+ Assertions.assertThat(d)
+ .isEqualTo(TimelineUtils.dumpTimelineRecordtoJSON(entity));
+ }
+
@Test
void testWriteMultipleEntities() throws Exception {
String id = "appId";
@@ -165,9 +167,7 @@ void testWriteMultipleEntities() throws Exception {
entity2.setCreatedTime(1425016503000L);
te2.addEntity(entity2);
- FileSystemTimelineWriterImpl fsi = null;
- try {
- fsi = new FileSystemTimelineWriterImpl();
+ try (FileSystemTimelineWriterImpl fsi = new
FileSystemTimelineWriterImpl()) {
Configuration conf = new YarnConfiguration();
String outputRoot = tmpFolder.getAbsolutePath();
conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
@@ -183,33 +183,19 @@ void testWriteMultipleEntities() throws Exception {
"flow_version", 12345678L, "app_id"),
te2, UserGroupInformation.createRemoteUser("user_id"));
- String fileName = outputRoot + File.separator + "entities" +
- File.separator + "cluster_id" + File.separator + "user_id" +
- File.separator + "flow_name" + File.separator + "flow_version" +
- File.separator + "12345678" + File.separator + "app_id" +
- File.separator + type + File.separator + id +
- FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+ String fileName = outputRoot + File.separator + "entities"
+ + File.separator + buildEntityTypeSubpath("cluster_id", "user_id",
+ "flow_name" ,"flow_version" ,12345678, "app_id", type)
+ + File.separator + id
+ + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
Path path = new Path(fileName);
FileSystem fs = FileSystem.get(conf);
- assertTrue(fs.exists(path),
- "Specified path(" + fileName + ") should exist: ");
- FileStatus fileStatus = fs.getFileStatus(path);
- assertFalse(fileStatus.isDirectory(), "Specified path should be a file");
- List<String> data = readFromFile(fs, path);
- assertEquals(3, data.size(), "data size is:" + data.size());
- String d = data.get(0);
+ List<String> data = readFromFile(fs, path, 3);
// confirm the contents same as what was written
- assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity));
-
+ assertRecordMatches(data.get(0), entity);
- String metricToString = data.get(1);
// confirm the contents same as what was written
- assertEquals(metricToString,
- TimelineUtils.dumpTimelineRecordtoJSON(entity2));
- } finally {
- if (fsi != null) {
- fsi.close();
- }
+ assertRecordMatches(data.get(1), entity2);
}
}
@@ -225,9 +211,7 @@ void testWriteEntitiesWithEmptyFlowName() throws Exception {
entity.setCreatedTime(1425016501000L);
te.addEntity(entity);
- FileSystemTimelineWriterImpl fsi = null;
- try {
- fsi = new FileSystemTimelineWriterImpl();
+ try (FileSystemTimelineWriterImpl fsi = new
FileSystemTimelineWriterImpl()) {
Configuration conf = new YarnConfiguration();
String outputRoot = tmpFolder.getAbsolutePath();
conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
@@ -239,32 +223,86 @@ void testWriteEntitiesWithEmptyFlowName() throws
Exception {
"flow_version", 12345678L, "app_id"),
te, UserGroupInformation.createRemoteUser("user_id"));
- String fileName = outputRoot + File.separator + "entities" +
- File.separator + "cluster_id" + File.separator + "user_id" +
- File.separator + "" + File.separator + "flow_version" +
- File.separator + "12345678" + File.separator + "app_id" +
- File.separator + type + File.separator + id +
- FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
- Path path = new Path(fileName);
- FileSystem fs = FileSystem.get(conf);
- assertTrue(fs.exists(path),
- "Specified path(" + fileName + ") should exist: ");
- FileStatus fileStatus = fs.getFileStatus(path);
- assertFalse(fileStatus.isDirectory(), "specified path should be a file");
- List<String> data = readFromFile(fs, path);
- assertEquals(2, data.size(), "data size is:" + data.size());
- String d = data.get(0);
+ String fileName = outputRoot + File.separator + "entities"
+ + File.separator + buildEntityTypeSubpath("cluster_id", "user_id",
+ "" ,"flow_version" ,12345678, "app_id", type)
+ + File.separator + id
+ + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+
+ List<String> data = readFromFile(FileSystem.get(conf), new
Path(fileName), 2);
+ // confirm the contents same as what was written
+ assertRecordMatches(data.get(0), entity);
+ }
+ }
+
+ /**
+ * Stress test the escaping logic.
+ */
+ @Test
+ void testWriteEntitiesWithEscaping() throws Exception {
+ String id = UP + "appid";
+ String type = UP + "type";
+
+ TimelineEntities te = new TimelineEntities();
+ TimelineEntity entity = new TimelineEntity();
+ entity.setId(id);
+ entity.setType(type);
+ entity.setCreatedTime(1425016501000L);
+ te.addEntity(entity);
+
+ try (FileSystemTimelineWriterImpl fsi = new
FileSystemTimelineWriterImpl()) {
+ Configuration conf = new YarnConfiguration();
+ String outputRoot = tmpFolder.getAbsolutePath();
+ conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
+ outputRoot);
+ fsi.init(conf);
+ fsi.start();
+ final String flowName = UP + "flow_name?";
+ final String flowVersion = UP + "flow_version/";
+ fsi.write(
+ new TimelineCollectorContext("cluster_id", "user_id", flowName,
+ flowVersion, 12345678L, "app_id"),
+ te, UserGroupInformation.createRemoteUser("user_id"));
+
+ String fileName = outputRoot + File.separator + "entities"
+ + File.separator + buildEntityTypeSubpath("cluster_id", "user_id",
+ flowName, flowVersion,12345678, "app_id", type)
+ + File.separator + escape(id, "id")
+ + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+
+ List<String> data = readFromFile(FileSystem.get(conf), new
Path(fileName), 2);
// confirm the contents same as what was written
- assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity));
- } finally {
- if (fsi != null) {
- fsi.close();
- }
+ assertRecordMatches(data.get(0), entity);
}
}
- private List<String> readFromFile(FileSystem fs, Path path)
+ /**
+ * Test escape downgrades file separators and inserts the fallback on a null
input.
+ */
+ @Test
+ public void testEscapingAndFallback() throws Throwable {
+ Assertions.assertThat(escape("", "fallback"))
+ .isEqualTo("fallback");
+ Assertions.assertThat(escape(File.separator, "fallback"))
+ .isEqualTo("_");
+ Assertions.assertThat(escape("?:", ""))
+ .isEqualTo("__");
+ }
+
+ /**
+ * Read a file line by line, logging its name first and verifying it is
actually a file.
+ * Asserts the number of lines read is as expected.
+ * @param fs fs
+ * @param path path
+ * @param entryCount number of entries expected.
+ * @return a possibly empty list of lines
+ * @throws IOException IO failure
+ */
+ private List<String> readFromFile(FileSystem fs, Path path, int entryCount)
throws IOException {
+
+ LOG.info("Reading file from {}", path);
+ assertIsFile(fs, path);
BufferedReader br = new BufferedReader(
new InputStreamReader(fs.open(path)));
List<String> data = new ArrayList<>();
@@ -274,6 +312,7 @@ private List<String> readFromFile(FileSystem fs, Path path)
line = br.readLine();
data.add(line);
}
+ Assertions.assertThat(data).hasSize(entryCount);
return data;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]