Repository: tez Updated Branches: refs/heads/master 314dfc79b -> 06757e9d0
TEZ-3960. Better error handling in proto history logger and add doAs support. (harishjp) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/06757e9d Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/06757e9d Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/06757e9d Branch: refs/heads/master Commit: 06757e9d03c613ef5fd1dfcb1d86615ba2222818 Parents: 314dfc7 Author: Harish JP <[email protected]> Authored: Fri Jun 29 22:31:15 2018 +0530 Committer: Harish JP <[email protected]> Committed: Fri Jun 29 22:31:38 2018 +0530 ---------------------------------------------------------------------- .../apache/tez/dag/api/TezConfiguration.java | 11 ++ .../logging/proto/DagManifesFileScanner.java | 105 ++++++++++++++++--- .../logging/proto/DatePartitionedLogger.java | 46 +++++--- .../proto/ProtoHistoryLoggingService.java | 26 +++-- .../logging/proto/ProtoMessageReader.java | 9 +- .../logging/proto/ProtoMessageWriter.java | 12 +-- .../proto/TestDagManifestFileScanner.java | 65 +++++++++++- .../proto/TestProtoHistoryLoggingService.java | 9 +- 8 files changed, 234 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/06757e9d/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 50b17b9..43014a4 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -1474,6 +1474,17 @@ public class TezConfiguration extends Configuration { public static final long TEZ_HISTORY_LOGGING_PROTO_SYNC_WINDOWN_SECS_DEFAULT = 60L; /** + * Long value. The amount of time in seconds to wait to ensure all events for a day is synced + * to disk. This should be maximum time variation b/w machines + maximum time to sync file + * content and metadata. + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty(type="boolean") + public static final String TEZ_HISTORY_LOGGING_PROTO_DOAS = + TEZ_PREFIX + "history.logging.proto-doas"; + public static final boolean TEZ_HISTORY_LOGGING_PROTO_DOAS_DEFAULT = false; + + /** * Int value. Time, in milliseconds, to wait while flushing YARN ATS data during shutdown. * Expert level setting. */ http://git-wip-us.apache.org/repos/asf/tez/blob/06757e9d/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/DagManifesFileScanner.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/DagManifesFileScanner.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/DagManifesFileScanner.java index c8ea02f..697083c 100644 --- a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/DagManifesFileScanner.java +++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/DagManifesFileScanner.java @@ -19,58 +19,75 @@ package org.apache.tez.dag.history.logging.proto; import java.io.Closeable; import java.io.IOException; +import java.security.PrivilegedAction; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.commons.io.IOUtils; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.ManifestEntryProto; import org.codehaus.jackson.map.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * Helper class to scan all the dag manifest files to get manifest entries. + * Helper class to scan all the dag manifest files to get manifest entries. This class is + * not thread safe. */ public class DagManifesFileScanner implements Closeable { - private static final int OFFSET_VERSION = 1; + private static final Logger LOG = LoggerFactory.getLogger(DagManifesFileScanner.class); + private static final int SCANNER_OFFSET_VERSION = 2; + private static final int MAX_RETRY = 3; private final ObjectMapper mapper = new ObjectMapper(); private final DatePartitionedLogger<ManifestEntryProto> manifestLogger; private final long syncTime; + private final boolean withDoas; private String scanDir; private Map<String, Long> offsets; - private List<Path> newFiles; + private Map<String, Integer> retryCount; + private List<FileStatus> newFiles; private ProtoMessageReader<ManifestEntryProto> reader; + private String currentFilePath; public DagManifesFileScanner(DatePartitionedLogger<ManifestEntryProto> manifestLogger) { this.manifestLogger = manifestLogger; this.syncTime = manifestLogger.getConfig().getLong( TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_SYNC_WINDOWN_SECS, TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_SYNC_WINDOWN_SECS_DEFAULT); + this.withDoas = manifestLogger.getConfig().getBoolean( + TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_DOAS, + TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_DOAS_DEFAULT); this.setOffset(LocalDate.ofEpochDay(0)); } + // Update the offset version and checks below to ensure correct versions are supported. // All public to simplify json conversion. public static class DagManifestOffset { public int version; public String scanDir; public Map<String, Long> offsets; + public Map<String, Integer> retryCount; } public void setOffset(String offset) { try { DagManifestOffset dagOffset = mapper.readValue(offset, DagManifestOffset.class); - if (dagOffset.version != OFFSET_VERSION) { + if (dagOffset.version > SCANNER_OFFSET_VERSION) { throw new IllegalArgumentException("Version mismatch: " + dagOffset.version); } this.scanDir = dagOffset.scanDir; - this.offsets = dagOffset.offsets; + this.offsets = dagOffset.offsets == null ? new HashMap<>() : dagOffset.offsets; + this.retryCount = dagOffset.retryCount == null ? new HashMap<>() : dagOffset.retryCount; this.newFiles = new ArrayList<>(); } catch (IOException e) { throw new IllegalArgumentException("Invalid offset", e); @@ -80,15 +97,17 @@ public class DagManifesFileScanner implements Closeable { public void setOffset(LocalDate date) { this.scanDir = manifestLogger.getDirForDate(date); this.offsets = new HashMap<>(); + this.retryCount = new HashMap<>(); this.newFiles = new ArrayList<>(); } public String getOffset() { try { DagManifestOffset offset = new DagManifestOffset(); - offset.version = OFFSET_VERSION; + offset.version = SCANNER_OFFSET_VERSION; offset.scanDir = scanDir; offset.offsets = offsets; + offset.retryCount = retryCount; return mapper.writeValueAsString(offset); } catch (IOException e) { throw new RuntimeException("Unexpected exception while converting to json.", e); @@ -98,17 +117,26 @@ public class DagManifesFileScanner implements Closeable { public ManifestEntryProto getNext() throws IOException { while (true) { if (reader != null) { - ManifestEntryProto evt = reader.readEvent(); + ManifestEntryProto evt = null; + try { + evt = reader.readEvent(); + retryCount.remove(currentFilePath); + } catch (IOException e) { + LOG.error("Error trying to read event from file: {}", currentFilePath, e); + incrementError(currentFilePath); + } if (evt != null) { offsets.put(reader.getFilePath().getName(), reader.getOffset()); return evt; } else { IOUtils.closeQuietly(reader); reader = null; + currentFilePath = null; } } if (!newFiles.isEmpty()) { - this.reader = manifestLogger.getReader(newFiles.remove(0)); + this.reader = getNextReader(); + this.currentFilePath = reader != null ? reader.getFilePath().toString() : null; } else { if (!loadMore()) { return null; @@ -117,6 +145,32 @@ public class DagManifesFileScanner implements Closeable { } } + private void incrementError(String path) { + int count = retryCount.getOrDefault(path, 0); + retryCount.put(path, count + 1); + } + + private ProtoMessageReader<ManifestEntryProto> getNextReader() throws IOException { + FileStatus status = newFiles.remove(0); + PrivilegedAction<ProtoMessageReader<ManifestEntryProto>> action = () -> { + try { + return manifestLogger.getReader(status.getPath()); + } catch (IOException e) { + String path = status.getPath().toString(); + LOG.error("Error trying to open file: {}", path, e); + incrementError(path); + return null; + } + }; + if (withDoas) { + UserGroupInformation proxyUser = UserGroupInformation.createProxyUser( + status.getOwner(), UserGroupInformation.getCurrentUser()); + return proxyUser.doAs(action); + } else { + return action.run(); + } + } + @Override public void close() throws IOException { if (reader != null) { @@ -125,15 +179,35 @@ public class DagManifesFileScanner implements Closeable { } } - private boolean loadMore() throws IOException { + private void filterErrors(List<FileStatus> files) { + Iterator<FileStatus> iter = files.iterator(); + while (iter.hasNext()) { + FileStatus status = iter.next(); + String path = status.getPath().toString(); + if (retryCount.getOrDefault(path, 0) > MAX_RETRY) { + LOG.warn("Removing file {}, too many errors", path); + iter.remove(); + } + } + } + + private void loadNewFiles(String todayDir) throws IOException { newFiles = manifestLogger.scanForChangedFiles(scanDir, offsets); + if (!scanDir.equals(todayDir)) { + filterErrors(newFiles); + } + } + + private boolean loadMore() throws IOException { + LocalDateTime now = manifestLogger.getNow(); + LocalDate today = now.toLocalDate(); + String todayDir = manifestLogger.getDirForDate(today); + loadNewFiles(todayDir); while (newFiles.isEmpty()) { - LocalDateTime utcNow = manifestLogger.getNow(); - if (utcNow.getHour() * 3600 + utcNow.getMinute() * 60 + utcNow.getSecond() < syncTime) { + if (now.getHour() * 3600 + now.getMinute() * 60 + now.getSecond() < syncTime) { // We are in the delay window for today, do not advance date if we are moving from // yesterday. - String yesterDir = manifestLogger.getDirForDate(utcNow.toLocalDate().minusDays(1)); - if (yesterDir.equals(scanDir)) { + if (scanDir.equals(manifestLogger.getDirForDate(today.minusDays(1)))) { return false; } } @@ -143,7 +217,8 @@ public class DagManifesFileScanner implements Closeable { } scanDir = nextDir; offsets = new HashMap<>(); - newFiles = manifestLogger.scanForChangedFiles(scanDir, offsets); + retryCount = new HashMap<>(); + loadNewFiles(todayDir); } return true; } http://git-wip-us.apache.org/repos/asf/tez/blob/06757e9d/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java index 8f89b2e..4ac64c6 100644 --- a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java +++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java @@ -33,6 +33,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.yarn.util.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.protobuf.MessageLite; import com.google.protobuf.Parser; @@ -43,27 +45,40 @@ import com.google.protobuf.Parser; * @param <T> The proto message type. */ public class DatePartitionedLogger<T extends MessageLite> { + private static final Logger LOG = LoggerFactory.getLogger(DatePartitionedLogger.class); // Everyone has permission to write, but with sticky set so that delete is restricted. // This is required, since the path is same for all users and everyone writes into it. private static final FsPermission DIR_PERMISSION = FsPermission.createImmutable((short)01777); + // Since the directories have broad permissions restrict the file read access. + private static final FsPermission FILE_UMASK = FsPermission.createImmutable((short)0066); + private final Parser<T> parser; private final Path basePath; private final Configuration conf; private final Clock clock; - private final FileSystem fileSystem; public DatePartitionedLogger(Parser<T> parser, Path baseDir, Configuration conf, Clock clock) throws IOException { - this.conf = conf; + this.conf = new Configuration(conf); this.clock = clock; this.parser = parser; - this.fileSystem = baseDir.getFileSystem(conf); - if (!fileSystem.exists(baseDir)) { - fileSystem.mkdirs(baseDir); - fileSystem.setPermission(baseDir, DIR_PERMISSION); + createDirIfNotExists(baseDir); + this.basePath = baseDir.getFileSystem(conf).resolvePath(baseDir); + FsPermission.setUMask(this.conf, FILE_UMASK); + } + + private void createDirIfNotExists(Path path) throws IOException { + FileSystem fileSystem = path.getFileSystem(conf); + try { + if (!fileSystem.exists(path)) { + fileSystem.mkdirs(path); + fileSystem.setPermission(path, DIR_PERMISSION); + } + } catch (IOException e) { + // Ignore this exception, if there is a problem it'll fail when trying to read or write. + LOG.warn("Error while trying to set permission: ", e); } - this.basePath = fileSystem.resolvePath(baseDir); } /** @@ -86,13 +101,14 @@ public class DatePartitionedLogger<T extends MessageLite> { */ public Path getPathForDate(LocalDate date, String fileName) throws IOException { Path path = new Path(basePath, getDirForDate(date)); - if (!fileSystem.exists(path)) { - fileSystem.mkdirs(path); - fileSystem.setPermission(path, DIR_PERMISSION); - } + createDirIfNotExists(path); return new Path(path, fileName); } + public Path getPathForSubdir(String dirName, String fileName) { + return new Path(new Path(basePath, dirName), fileName); + } + /** * Extract the date from the directory name, this should be a directory created by this class. */ @@ -116,6 +132,7 @@ public class DatePartitionedLogger<T extends MessageLite> { public String getNextDirectory(String currentDir) throws IOException { // Fast check, if the next day directory exists return it. String nextDate = getDirForDate(getDateFromDir(currentDir).plusDays(1)); + FileSystem fileSystem = basePath.getFileSystem(conf); if (fileSystem.exists(new Path(basePath, nextDate))) { return nextDate; } @@ -135,10 +152,11 @@ public class DatePartitionedLogger<T extends MessageLite> { * Returns new or changed files in the given directory. The offsets are used to find * changed files. */ - public List<Path> scanForChangedFiles(String subDir, Map<String, Long> currentOffsets) + public List<FileStatus> scanForChangedFiles(String subDir, Map<String, Long> currentOffsets) throws IOException { Path dirPath = new Path(basePath, subDir); - List<Path> newFiles = new ArrayList<>(); + FileSystem fileSystem = basePath.getFileSystem(conf); + List<FileStatus> newFiles = new ArrayList<>(); if (!fileSystem.exists(dirPath)) { return newFiles; } @@ -147,7 +165,7 @@ public class DatePartitionedLogger<T extends MessageLite> { Long offset = currentOffsets.get(fileName); // If the offset was never added or offset < fileSize. if (offset == null || offset < status.getLen()) { - newFiles.add(new Path(dirPath, fileName)); + newFiles.add(status); } } return newFiles; http://git-wip-us.apache.org/repos/asf/tez/blob/06757e9d/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java index 60cbda5..206b1c1 100644 --- a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java +++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java @@ -19,6 +19,7 @@ package org.apache.tez.dag.history.logging.proto; import java.io.IOException; +import java.time.LocalDate; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -49,13 +50,15 @@ public class ProtoHistoryLoggingService extends HistoryLoggingService { private boolean loggingDisabled = false; private final LinkedBlockingQueue<DAGHistoryEvent> eventQueue = - new LinkedBlockingQueue<DAGHistoryEvent>(10000); + new LinkedBlockingQueue<>(10000); private Thread eventHandlingThread; private final AtomicBoolean stopped = new AtomicBoolean(false); private TezProtoLoggers loggers; private ProtoMessageWriter<HistoryEventProto> appEventsWriter; private ProtoMessageWriter<HistoryEventProto> dagEventsWriter; + private ProtoMessageWriter<ManifestEntryProto> manifestEventsWriter; + private LocalDate manifestDate; private TezDAGID currentDagId; private long dagSubmittedEventOffset = -1; @@ -101,6 +104,7 @@ public class ProtoHistoryLoggingService extends HistoryLoggingService { eventHandlingThread.join(); IOUtils.closeQuietly(appEventsWriter); IOUtils.closeQuietly(dagEventsWriter); + IOUtils.closeQuietly(manifestEventsWriter); LOG.info("Stopped ProtoHistoryLoggingService"); } @@ -161,7 +165,8 @@ public class ProtoHistoryLoggingService extends HistoryLoggingService { } else if (type == HistoryEventType.DAG_SUBMITTED) { finishCurrentDag(null); currentDagId = dagId; - dagEventsWriter = loggers.getDagEventsLogger().getWriter(dagId.toString()); + dagEventsWriter = loggers.getDagEventsLogger().getWriter(dagId.toString() + + "_" + appContext.getApplicationAttemptId().getAttemptId()); dagSubmittedEventOffset = dagEventsWriter.getOffset(); dagEventsWriter.writeProto(converter.convert(historyEvent)); } else if (dagEventsWriter != null) { @@ -174,16 +179,21 @@ public class ProtoHistoryLoggingService extends HistoryLoggingService { if (dagEventsWriter == null) { return; } - ProtoMessageWriter<ManifestEntryProto> writer = null; try { long finishEventOffset = -1; if (event != null) { finishEventOffset = dagEventsWriter.getOffset(); dagEventsWriter.writeProto(converter.convert(event)); } - // Do not cache this writer, it should be created at the time of writing - writer = loggers.getManifestEventsLogger() - .getWriter(appContext.getApplicationAttemptId().toString()); + DatePartitionedLogger<ManifestEntryProto> manifestLogger = loggers.getManifestEventsLogger(); + if (manifestDate == null || !manifestDate.equals(manifestLogger.getNow().toLocalDate())) { + // The day has changed write to a new file. + IOUtils.closeQuietly(manifestEventsWriter); + manifestEventsWriter = manifestLogger.getWriter( + appContext.getApplicationAttemptId().toString()); + manifestDate = manifestLogger.getDateFromDir( + manifestEventsWriter.getPath().getParent().getName()); + } ManifestEntryProto.Builder entry = ManifestEntryProto.newBuilder() .setDagId(currentDagId.toString()) .setAppId(currentDagId.getApplicationId().toString()) @@ -196,13 +206,13 @@ public class ProtoHistoryLoggingService extends HistoryLoggingService { if (event != null) { entry.setDagId(event.getDagID().toString()); } - writer.writeProto(entry.build()); + manifestEventsWriter.writeProto(entry.build()); + manifestEventsWriter.hflush(); appEventsWriter.hflush(); } finally { // On an error, cleanup everything this will ensure, we do not use one dag's writer // into another dag. IOUtils.closeQuietly(dagEventsWriter); - IOUtils.closeQuietly(writer); dagEventsWriter = null; currentDagId = null; dagSubmittedEventOffset = -1; http://git-wip-us.apache.org/repos/asf/tez/blob/06757e9d/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java index e5f5e6b..d736fea 100644 --- a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java +++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java @@ -24,19 +24,22 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.Reader; import com.google.protobuf.MessageLite; import com.google.protobuf.Parser; public class ProtoMessageReader<T extends MessageLite> implements Closeable { private final Path filePath; - private final SequenceFile.Reader reader; + private final Reader reader; private final ProtoMessageWritable<T> writable; ProtoMessageReader(Configuration conf, Path filePath, Parser<T> parser) throws IOException { this.filePath = filePath; - this.reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(filePath)); + // The writer does not flush the length during hflush. Using length options lets us read + // past length in the FileStatus but it will throw EOFException during a read instead + // of returning null. + this.reader = new Reader(conf, Reader.file(filePath), Reader.length(Long.MAX_VALUE)); this.writable = new ProtoMessageWritable<>(parser); } http://git-wip-us.apache.org/repos/asf/tez/blob/06757e9d/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java index ca9ba61..869b603 100644 --- a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java +++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java @@ -26,24 +26,24 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.SequenceFile.Writer; import com.google.protobuf.MessageLite; import com.google.protobuf.Parser; public class ProtoMessageWriter<T extends MessageLite> implements Closeable { private final Path filePath; - private final SequenceFile.Writer writer; + private final Writer writer; private final ProtoMessageWritable<T> writable; ProtoMessageWriter(Configuration conf, Path filePath, Parser<T> parser) throws IOException { this.filePath = filePath; this.writer = SequenceFile.createWriter( conf, - SequenceFile.Writer.file(filePath), - SequenceFile.Writer.keyClass(NullWritable.class), - SequenceFile.Writer.valueClass(ProtoMessageWritable.class), - SequenceFile.Writer.appendIfExists(true), - SequenceFile.Writer.compression(CompressionType.RECORD)); + Writer.file(filePath), + Writer.keyClass(NullWritable.class), + Writer.valueClass(ProtoMessageWritable.class), + Writer.compression(CompressionType.RECORD)); this.writable = new ProtoMessageWritable<>(parser); } http://git-wip-us.apache.org/repos/asf/tez/blob/06757e9d/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestDagManifestFileScanner.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestDagManifestFileScanner.java b/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestDagManifestFileScanner.java index fcaa315..4950522 100644 --- a/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestDagManifestFileScanner.java +++ b/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestDagManifestFileScanner.java @@ -20,6 +20,9 @@ package org.apache.tez.dag.history.logging.proto; import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.util.Clock; import org.apache.tez.dag.api.TezConfiguration; @@ -43,12 +46,14 @@ public class TestDagManifestFileScanner { clock = new MockClock(); Configuration conf = new Configuration(false); conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_BASE_DIR, basePath); + // LocalFileSystem does not implement truncate. + conf.set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem"); TezProtoLoggers loggers = new TezProtoLoggers(); loggers.setup(conf, clock); manifestLogger = loggers.getManifestEventsLogger(); } - @Test + @Test(timeout=5000) public void testNormal() throws Exception { clock.setTime(0); // 0th day. createManifestEvents(0, 8); @@ -85,6 +90,37 @@ public class TestDagManifestFileScanner { // Not able to test append since the LocalFileSystem does not implement append. } + private Path deleteFilePath = null; + @Test(timeout=5000) + public void testError() throws Exception { + clock.setTime(0); // 0th day. + createManifestEvents(0, 4); + corruptFiles(); + clock.setTime((24 * 60 * 60 + 1) * 1000); // 1 day 1 sec. + createManifestEvents(24 * 3600, 1); + + DagManifesFileScanner scanner = new DagManifesFileScanner(manifestLogger); + Assert.assertNotNull(scanner.getNext()); + deleteFilePath.getFileSystem(manifestLogger.getConfig()).delete(deleteFilePath, false); + // 4 files - 1 file deleted - 1 truncated - 1 corrupted => 1 remains. + Assert.assertNull(scanner.getNext()); + + // Save offset for later use. + String offset = scanner.getOffset(); + + // Move time outside the window, it should skip files with error and give more data for + // next day. + clock.setTime((24 * 60 * 60 + 61) * 1000); // 1 day 61 sec. + Assert.assertNotNull(scanner.getNext()); + Assert.assertNull(scanner.getNext()); + + // Reset the offset + scanner.setOffset(offset); + Assert.assertNotNull(scanner.getNext()); + Assert.assertNull(scanner.getNext()); + scanner.close(); + } + private void createManifestEvents(long time, int numEvents) throws IOException { for (int i = 0; i < numEvents; ++i) { ApplicationId appId = ApplicationId.newInstance(1000l, i); @@ -103,6 +139,33 @@ public class TestDagManifestFileScanner { } } + private void corruptFiles() throws IOException { + int op = 0; + Configuration conf = manifestLogger.getConfig(); + Path base = new Path( + conf.get(TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_BASE_DIR) + "/dag_meta"); + FileSystem fs = base.getFileSystem(conf); + for (FileStatus status : fs.listStatus(base)) { + if (status.isDirectory()) { + for (FileStatus file : fs.listStatus(status.getPath())) { + if (!file.getPath().getName().startsWith("application_")) { + continue; + } + switch (op) { + case 0: + case 1: + fs.truncate(file.getPath(), op == 1 ? 0 : file.getLen() - 20); + break; + case 3: + deleteFilePath = file.getPath(); + break; + } + op++; + } + } + } + } + private static class MockClock implements Clock { private long time = 0; http://git-wip-us.apache.org/repos/asf/tez/blob/06757e9d/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java b/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java index 4bd5d4e..bc79b07 100644 --- a/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java +++ b/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java @@ -21,6 +21,7 @@ package org.apache.tez.dag.history.logging.proto; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.EOFException; import java.io.IOException; import java.time.LocalDate; import java.util.ArrayList; @@ -86,14 +87,18 @@ public class TestProtoHistoryLoggingService { // Verify dag events are logged. DatePartitionedLogger<HistoryEventProto> dagLogger = loggers.getDagEventsLogger(); - Path dagFilePath = dagLogger.getPathForDate(LocalDate.ofEpochDay(0), dagId.toString()); + Path dagFilePath = dagLogger.getPathForDate(LocalDate.ofEpochDay(0), dagId.toString() + "_" + 1); ProtoMessageReader<HistoryEventProto> reader = dagLogger.getReader(dagFilePath); HistoryEventProto evt = reader.readEvent(); int ind = 1; while (evt != null) { Assert.assertEquals(protos.get(ind), evt); ind++; - evt = reader.readEvent(); + try { + evt = reader.readEvent(); + } catch (EOFException e) { + evt = null; + } } reader.close();
