TEZ-3915. Create protobuf based history event logger. (Harish Jaiprakash, reviewed by Gunther Hagleitner)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/24b872a7 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/24b872a7 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/24b872a7 Branch: refs/heads/master Commit: 24b872a7fc7a9bca11cbf4b5da80198386485547 Parents: 871ea80 Author: Harish JP <[email protected]> Authored: Fri Apr 20 13:41:37 2018 +0530 Committer: Harish JP <[email protected]> Committed: Fri Apr 20 13:41:37 2018 +0530 ---------------------------------------------------------------------- pom.xml | 2 +- .../apache/tez/dag/api/TezConfiguration.java | 21 + tez-dist/pom.xml | 10 + tez-plugins/pom.xml | 2 + .../findbugs-exclude.xml | 50 ++ tez-plugins/tez-protobuf-history-plugin/pom.xml | 92 +++ .../logging/proto/DagManifesFileScanner.java | 150 ++++ .../logging/proto/DatePartitionedLogger.java | 167 +++++ .../proto/HistoryEventProtoConverter.java | 498 +++++++++++++ .../proto/ProtoHistoryLoggingService.java | 211 ++++++ .../logging/proto/ProtoMessageReader.java | 66 ++ .../logging/proto/ProtoMessageWritable.java | 101 +++ .../logging/proto/ProtoMessageWriter.java | 71 ++ .../history/logging/proto/TezProtoLoggers.java | 64 ++ .../src/main/proto/HistoryLogger.proto | 49 ++ .../proto/TestDagManifestFileScanner.java | 118 +++ .../proto/TestHistoryEventProtoConverter.java | 716 +++++++++++++++++++ .../proto/TestProtoHistoryLoggingService.java | 195 +++++ 18 files changed, 2582 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 34240cf..16745f0 100644 --- a/pom.xml +++ b/pom.xml @@ -37,7 +37,7 @@ <properties> <maven.test.redirectTestOutputToFile>true</maven.test.redirectTestOutputToFile> <clover.license>${user.home}/clover.license</clover.license> - <hadoop.version>2.7.0</hadoop.version> + <hadoop.version>2.7.2</hadoop.version> <jetty.version>9.3.22.v20171030</jetty.version> <netty.version>3.6.2.Final</netty.version> <pig.version>0.13.0</pig.version> http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 6d3050d..243f278 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -1439,6 +1439,27 @@ public class TezConfiguration extends Configuration { public static final int TEZ_SIMPLE_HISTORY_LOGGING_MAX_ERRORS_DEFAULT = 10; /** + * String value. The base directory into which history data will be written when proto history + * logging service is used for {@link TezConfiguration#TEZ_HISTORY_LOGGING_SERVICE_CLASS}. + * If this is not set, then logging is disabled for ProtoHistoryLoggingService. + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty + public static final String TEZ_HISTORY_LOGGING_PROTO_BASE_DIR = + TEZ_PREFIX + "history.logging.proto-base-dir"; + + /** + * Long value. The amount of time in seconds to wait to ensure all events for a day is synced + * to disk. This should be maximum time variation b/w machines + maximum time to sync file + * content and metadata. + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty(type="long") + public static final String TEZ_HISTORY_LOGGING_PROTO_SYNC_WINDOWN_SECS = + TEZ_PREFIX + "history.logging.proto-sync-window-secs"; + public static final long TEZ_HISTORY_LOGGING_PROTO_SYNC_WINDOWN_SECS_DEFAULT = 60L; + + /** * Int value. Time, in milliseconds, to wait while flushing YARN ATS data during shutdown. * Expert level setting. */ http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-dist/pom.xml ---------------------------------------------------------------------- diff --git a/tez-dist/pom.xml b/tez-dist/pom.xml index 854a548..9447fe7 100644 --- a/tez-dist/pom.xml +++ b/tez-dist/pom.xml @@ -69,6 +69,11 @@ </dependency> <dependency> <groupId>org.apache.tez</groupId> + <artifactId>tez-protobuf-history-plugin</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> <artifactId>hadoop-shim-2.7</artifactId> <version>${project.version}</version> </dependency> @@ -97,6 +102,11 @@ </dependency> <dependency> <groupId>org.apache.tez</groupId> + <artifactId>tez-protobuf-history-plugin</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> <artifactId>hadoop-shim-2.8</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-plugins/pom.xml ---------------------------------------------------------------------- diff --git a/tez-plugins/pom.xml b/tez-plugins/pom.xml index fb0691a..f90cae7 100644 --- a/tez-plugins/pom.xml +++ b/tez-plugins/pom.xml @@ -35,6 +35,7 @@ </property> </activation> <modules> + <module>tez-protobuf-history-plugin</module> <module>tez-yarn-timeline-history</module> <module>tez-yarn-timeline-history-with-acls</module> <module>tez-history-parser</module> @@ -47,6 +48,7 @@ <activeByDefault>false</activeByDefault> </activation> <modules> + <module>tez-protobuf-history-plugin</module> <module>tez-yarn-timeline-history</module> <module>tez-yarn-timeline-history-with-acls</module> <module>tez-yarn-timeline-cache-plugin</module> http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-plugins/tez-protobuf-history-plugin/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-protobuf-history-plugin/findbugs-exclude.xml b/tez-plugins/tez-protobuf-history-plugin/findbugs-exclude.xml new file mode 100644 index 0000000..c91265d --- /dev/null +++ b/tez-plugins/tez-protobuf-history-plugin/findbugs-exclude.xml @@ -0,0 +1,50 @@ +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> +<FindBugsFilter> + <Match> + <Class name="~org\.apache\.tez\.dag\.history\.logging\.proto\.HistoryLoggerProtos\$KVPair"/> + <Field name="unknownFields"/> + <Bug pattern="SE_BAD_FIELD"/> + </Match> + + <Match> + <Class name="~org\.apache\.tez\.dag\.history\.logging\.proto\.HistoryLoggerProtos\$.*Proto"/> + <Field name="unknownFields"/> + <Bug pattern="SE_BAD_FIELD"/> + </Match> + + <Match> + <Class name="~org\.apache\.tez\.dag\.history\.logging\.proto\.HistoryLoggerProtos\$KVPair"/> + <Field name="PARSER"/> + <Bug pattern="MS_SHOULD_BE_FINAL"/> + </Match> + + <Match> + <Class name="~org\.apache\.tez\.dag\.history\.logging\.proto\.HistoryLoggerProtos\$.*Proto"/> + <Field name="PARSER"/> + <Bug pattern="MS_SHOULD_BE_FINAL"/> + </Match> + + <Match> + <Class name="~org\.apache\.tez\.dag\.history\.logging\.proto\.HistoryLoggerProtos\$KVPair\$Builder"/> + <Method name="maybeForceBuilderInitialization"/> + <Bug pattern="UCF_USELESS_CONTROL_FLOW"/> + </Match> + + <Match> + <Class name="~org\.apache\.tez\.dag\.history\.logging\.proto\.HistoryLoggerProtos\$.*Proto\$Builder"/> + <Method name="maybeForceBuilderInitialization"/> + <Bug pattern="UCF_USELESS_CONTROL_FLOW"/> + </Match> +</FindBugsFilter> http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-plugins/tez-protobuf-history-plugin/pom.xml ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-protobuf-history-plugin/pom.xml b/tez-plugins/tez-protobuf-history-plugin/pom.xml new file mode 100644 index 0000000..880aca9 --- /dev/null +++ b/tez-plugins/tez-protobuf-history-plugin/pom.xml @@ -0,0 +1,92 @@ +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.tez</groupId> + <artifactId>tez-plugins</artifactId> + <version>0.9.2-SNAPSHOT</version> + </parent> + <artifactId>tez-protobuf-history-plugin</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-dag</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-maven-plugins</artifactId> + <executions> + <execution> + <id>compile-protoc</id> + <phase>generate-sources</phase> + <goals> + <goal>protoc</goal> + </goals> + <configuration> + <protocVersion>${protobuf.version}</protocVersion> + <protocCommand>${protoc.path}</protocCommand> + <imports> + <param>${basedir}/src/main/proto</param> + </imports> + <source> + <directory>${basedir}/src/main/proto</directory> + <includes> + <include>HistoryLogger.proto</include> + </includes> + </source> + <output>${project.build.directory}/generated-sources/java</output> + </configuration> + </execution> + </executions> + </plugin> + + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/DagManifesFileScanner.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/DagManifesFileScanner.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/DagManifesFileScanner.java new file mode 100644 index 0000000..c8ea02f --- /dev/null +++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/DagManifesFileScanner.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.dag.history.logging.proto; + +import java.io.Closeable; +import java.io.IOException; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.Path; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.ManifestEntryProto; +import org.codehaus.jackson.map.ObjectMapper; + +/** + * Helper class to scan all the dag manifest files to get manifest entries. + */ +public class DagManifesFileScanner implements Closeable { + private static final int OFFSET_VERSION = 1; + + private final ObjectMapper mapper = new ObjectMapper(); + private final DatePartitionedLogger<ManifestEntryProto> manifestLogger; + private final long syncTime; + + private String scanDir; + private Map<String, Long> offsets; + private List<Path> newFiles; + + private ProtoMessageReader<ManifestEntryProto> reader; + + public DagManifesFileScanner(DatePartitionedLogger<ManifestEntryProto> manifestLogger) { + this.manifestLogger = manifestLogger; + this.syncTime = manifestLogger.getConfig().getLong( + TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_SYNC_WINDOWN_SECS, + TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_SYNC_WINDOWN_SECS_DEFAULT); + this.setOffset(LocalDate.ofEpochDay(0)); + } + + // All public to simplify json conversion. + public static class DagManifestOffset { + public int version; + public String scanDir; + public Map<String, Long> offsets; + } + + public void setOffset(String offset) { + try { + DagManifestOffset dagOffset = mapper.readValue(offset, DagManifestOffset.class); + if (dagOffset.version != OFFSET_VERSION) { + throw new IllegalArgumentException("Version mismatch: " + dagOffset.version); + } + this.scanDir = dagOffset.scanDir; + this.offsets = dagOffset.offsets; + this.newFiles = new ArrayList<>(); + } catch (IOException e) { + throw new IllegalArgumentException("Invalid offset", e); + } + } + + public void setOffset(LocalDate date) { + this.scanDir = manifestLogger.getDirForDate(date); + this.offsets = new HashMap<>(); + this.newFiles = new ArrayList<>(); + } + + public String getOffset() { + try { + DagManifestOffset offset = new DagManifestOffset(); + offset.version = OFFSET_VERSION; + offset.scanDir = scanDir; + offset.offsets = offsets; + return mapper.writeValueAsString(offset); + } catch (IOException e) { + throw new RuntimeException("Unexpected exception while converting to json.", e); + } + } + + public ManifestEntryProto getNext() throws IOException { + while (true) { + if (reader != null) { + ManifestEntryProto evt = reader.readEvent(); + if (evt != null) { + offsets.put(reader.getFilePath().getName(), reader.getOffset()); + return evt; + } else { + IOUtils.closeQuietly(reader); + reader = null; + } + } + if (!newFiles.isEmpty()) { + this.reader = manifestLogger.getReader(newFiles.remove(0)); + } else { + if (!loadMore()) { + return null; + } + } + } + } + + @Override + public void close() throws IOException { + if (reader != null) { + reader.close(); + reader = null; + } + } + + private boolean loadMore() throws IOException { + newFiles = manifestLogger.scanForChangedFiles(scanDir, offsets); + while (newFiles.isEmpty()) { + LocalDateTime utcNow = manifestLogger.getNow(); + if (utcNow.getHour() * 3600 + utcNow.getMinute() * 60 + utcNow.getSecond() < syncTime) { + // We are in the delay window for today, do not advance date if we are moving from + // yesterday. + String yesterDir = manifestLogger.getDirForDate(utcNow.toLocalDate().minusDays(1)); + if (yesterDir.equals(scanDir)) { + return false; + } + } + String nextDir = manifestLogger.getNextDirectory(scanDir); + if (nextDir == null) { + return false; + } + scanDir = nextDir; + offsets = new HashMap<>(); + newFiles = manifestLogger.scanForChangedFiles(scanDir, offsets); + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java new file mode 100644 index 0000000..8f89b2e --- /dev/null +++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.history.logging.proto; + +import java.io.IOException; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.yarn.util.Clock; + +import com.google.protobuf.MessageLite; +import com.google.protobuf.Parser; + +/** + * Class to create proto reader and writer for a date partitioned directory structure. + * + * @param <T> The proto message type. + */ +public class DatePartitionedLogger<T extends MessageLite> { + // Everyone has permission to write, but with sticky set so that delete is restricted. + // This is required, since the path is same for all users and everyone writes into it. + private static final FsPermission DIR_PERMISSION = FsPermission.createImmutable((short)01777); + + private final Parser<T> parser; + private final Path basePath; + private final Configuration conf; + private final Clock clock; + private final FileSystem fileSystem; + + public DatePartitionedLogger(Parser<T> parser, Path baseDir, Configuration conf, Clock clock) + throws IOException { + this.conf = conf; + this.clock = clock; + this.parser = parser; + this.fileSystem = baseDir.getFileSystem(conf); + if (!fileSystem.exists(baseDir)) { + fileSystem.mkdirs(baseDir); + fileSystem.setPermission(baseDir, DIR_PERMISSION); + } + this.basePath = fileSystem.resolvePath(baseDir); + } + + /** + * Creates a writer for the given fileName, with date as today. + */ + public ProtoMessageWriter<T> getWriter(String fileName) throws IOException { + Path filePath = getPathForDate(getNow().toLocalDate(), fileName); + return new ProtoMessageWriter<>(conf, filePath, parser); + } + + /** + * Creates a reader for the given filePath, no validation is done. + */ + public ProtoMessageReader<T> getReader(Path filePath) throws IOException { + return new ProtoMessageReader<>(conf, filePath, parser); + } + + /** + * Create a path for the given date and fileName. This can be used to create a reader. + */ + public Path getPathForDate(LocalDate date, String fileName) throws IOException { + Path path = new Path(basePath, getDirForDate(date)); + if (!fileSystem.exists(path)) { + fileSystem.mkdirs(path); + fileSystem.setPermission(path, DIR_PERMISSION); + } + return new Path(path, fileName); + } + + /** + * Extract the date from the directory name, this should be a directory created by this class. + */ + public LocalDate getDateFromDir(String dirName) { + if (!dirName.startsWith("date=")) { + throw new IllegalArgumentException("Invalid directory: "+ dirName); + } + return LocalDate.parse(dirName.substring(5), DateTimeFormatter.ISO_LOCAL_DATE); + } + + /** + * Returns the directory name for a given date. + */ + public String getDirForDate(LocalDate date) { + return "date=" + DateTimeFormatter.ISO_LOCAL_DATE.format(date); + } + + /** + * Find next available directory, after the given directory. + */ + public String getNextDirectory(String currentDir) throws IOException { + // Fast check, if the next day directory exists return it. + String nextDate = getDirForDate(getDateFromDir(currentDir).plusDays(1)); + if (fileSystem.exists(new Path(basePath, nextDate))) { + return nextDate; + } + // Have to scan the directory to find min date greater than currentDir. + String dirName = null; + for (FileStatus status : fileSystem.listStatus(basePath)) { + String name = status.getPath().getName(); + // String comparison is good enough, since its of form date=yyyy-MM-dd + if (name.compareTo(currentDir) > 0 && (dirName == null || name.compareTo(dirName) < 0)) { + dirName = name; + } + } + return dirName; + } + + /** + * Returns new or changed files in the given directory. The offsets are used to find + * changed files. + */ + public List<Path> scanForChangedFiles(String subDir, Map<String, Long> currentOffsets) + throws IOException { + Path dirPath = new Path(basePath, subDir); + List<Path> newFiles = new ArrayList<>(); + if (!fileSystem.exists(dirPath)) { + return newFiles; + } + for (FileStatus status : fileSystem.listStatus(dirPath)) { + String fileName = status.getPath().getName(); + Long offset = currentOffsets.get(fileName); + // If the offset was never added or offset < fileSize. + if (offset == null || offset < status.getLen()) { + newFiles.add(new Path(dirPath, fileName)); + } + } + return newFiles; + } + + /** + * Returns the current time, using the underlying clock in UTC time. + */ + public LocalDateTime getNow() { + // Use UTC date to ensure reader date is same on all timezones. + return LocalDateTime.ofEpochSecond(clock.getTime() / 1000, 0, ZoneOffset.UTC); + } + + public Configuration getConfig() { + return conf; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoConverter.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoConverter.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoConverter.java new file mode 100644 index 0000000..44dccb6 --- /dev/null +++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoConverter.java @@ -0,0 +1,498 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.history.logging.proto; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.TreeMap; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.common.ATSConstants; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.apache.tez.dag.api.records.DAGProtos.CallerContextProto; +import org.apache.tez.dag.app.web.AMWebController; +import org.apache.tez.dag.history.HistoryEvent; +import org.apache.tez.dag.history.events.AMLaunchedEvent; +import org.apache.tez.dag.history.events.AMStartedEvent; +import org.apache.tez.dag.history.events.AppLaunchedEvent; +import org.apache.tez.dag.history.events.ContainerLaunchedEvent; +import org.apache.tez.dag.history.events.ContainerStoppedEvent; +import org.apache.tez.dag.history.events.DAGFinishedEvent; +import org.apache.tez.dag.history.events.DAGInitializedEvent; +import org.apache.tez.dag.history.events.DAGRecoveredEvent; +import org.apache.tez.dag.history.events.DAGStartedEvent; +import org.apache.tez.dag.history.events.DAGSubmittedEvent; +import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; +import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; +import org.apache.tez.dag.history.events.TaskFinishedEvent; +import org.apache.tez.dag.history.events.TaskStartedEvent; +import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent; +import org.apache.tez.dag.history.events.VertexFinishedEvent; +import org.apache.tez.dag.history.events.VertexInitializedEvent; +import org.apache.tez.dag.history.events.VertexStartedEvent; +import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.HistoryEventProto; +import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.KVPair; +import org.apache.tez.dag.history.utils.DAGUtils; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.codehaus.jackson.map.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Convert history event into HistoryEventProto message. + */ +public class HistoryEventProtoConverter { + private static final Logger log = + LoggerFactory.getLogger(HistoryEventProtoConverter.class); + + private final ObjectMapper mapper = new ObjectMapper(); + + /** + * Convert a given history event to HistoryEventProto message. + */ + public HistoryEventProto convert(HistoryEvent historyEvent) { + validateEvent(historyEvent); + switch (historyEvent.getEventType()) { + case APP_LAUNCHED: + return convertAppLaunchedEvent((AppLaunchedEvent) historyEvent); + case AM_LAUNCHED: + return convertAMLaunchedEvent((AMLaunchedEvent) historyEvent); + case AM_STARTED: + return convertAMStartedEvent((AMStartedEvent) historyEvent); + case CONTAINER_LAUNCHED: + return convertContainerLaunchedEvent((ContainerLaunchedEvent) historyEvent); + case CONTAINER_STOPPED: + return convertContainerStoppedEvent((ContainerStoppedEvent) historyEvent); + case DAG_SUBMITTED: + return convertDAGSubmittedEvent((DAGSubmittedEvent) historyEvent); + case DAG_INITIALIZED: + return convertDAGInitializedEvent((DAGInitializedEvent) historyEvent); + case DAG_STARTED: + return convertDAGStartedEvent((DAGStartedEvent) historyEvent); + case DAG_FINISHED: + return convertDAGFinishedEvent((DAGFinishedEvent) historyEvent); + case VERTEX_INITIALIZED: + return convertVertexInitializedEvent((VertexInitializedEvent) historyEvent); + case VERTEX_STARTED: + return convertVertexStartedEvent((VertexStartedEvent) historyEvent); + case VERTEX_FINISHED: + return convertVertexFinishedEvent((VertexFinishedEvent) historyEvent); + case TASK_STARTED: + return convertTaskStartedEvent((TaskStartedEvent) historyEvent); + case TASK_FINISHED: + return convertTaskFinishedEvent((TaskFinishedEvent) historyEvent); + case TASK_ATTEMPT_STARTED: + return convertTaskAttemptStartedEvent((TaskAttemptStartedEvent) historyEvent); + case TASK_ATTEMPT_FINISHED: + return convertTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) historyEvent); + case VERTEX_CONFIGURE_DONE: + return convertVertexReconfigureDoneEvent((VertexConfigurationDoneEvent) historyEvent); + case DAG_RECOVERED: + return convertDAGRecoveredEvent((DAGRecoveredEvent) historyEvent); + case VERTEX_COMMIT_STARTED: + case VERTEX_GROUP_COMMIT_STARTED: + case VERTEX_GROUP_COMMIT_FINISHED: + case DAG_COMMIT_STARTED: + case DAG_KILL_REQUEST: + throw new UnsupportedOperationException("Invalid Event, does not support history, eventType=" + + historyEvent.getEventType()); + // Do not add default, if a new event type is added, we'll get a warning for the + // switch. + } + throw new UnsupportedOperationException( + "Unhandled Event, eventType=" + historyEvent.getEventType()); + } + + private void validateEvent(HistoryEvent event) { + if (!event.isHistoryEvent()) { + throw new UnsupportedOperationException( + "Invalid Event, does not support history" + ", eventType=" + event.getEventType()); + } + } + + private HistoryEventProto.Builder makeBuilderForEvent(HistoryEvent event, long time, + TezDAGID dagId, ApplicationId appId, ApplicationAttemptId appAttemptId, TezVertexID vertexId, + TezTaskID taskId, TezTaskAttemptID taskAttemptId, String user) { + HistoryEventProto.Builder builder = HistoryEventProto.newBuilder(); + builder.setEventType(event.getEventType().name()); + builder.setEventTime(time); + if (taskAttemptId != null) { + builder.setTaskAttemptId(taskAttemptId.toString()); + taskId = taskAttemptId.getTaskID(); + } + if (taskId != null) { + builder.setTaskId(taskId.toString()); + vertexId = taskId.getVertexID(); + } + if (vertexId != null) { + builder.setVertexId(vertexId.toString()); + dagId = vertexId.getDAGId(); + } + if (dagId != null) { + builder.setDagId(dagId.toString()); + if (appId == null) { + appId = dagId.getApplicationId(); + } + } + if (appAttemptId != null) { + builder.setAppAttemptId(appAttemptId.toString()); + if (appId == null) { + appId = appAttemptId.getApplicationId(); + } + } + if (appId != null) { + builder.setAppId(appId.toString()); + } + if (user != null) { + builder.setUser(user); + } + return builder; + } + + private void addEventData(HistoryEventProto.Builder builder, String key, String value) { + if (value == null) { + return; + } + builder.addEventData(KVPair.newBuilder().setKey(key).setValue(value)); + } + + private void addEventData(HistoryEventProto.Builder builder, String key, Number value) { + builder.addEventData(KVPair.newBuilder().setKey(key).setValue(value.toString())); + } + + private void addEventData(HistoryEventProto.Builder builder, String key, + Map<String, Object> value) { + try { + builder.addEventData( + KVPair.newBuilder().setKey(key).setValue(mapper.writeValueAsString(value))); + } catch (IOException e) { + log.error("Error converting value for key {} to json: ", key, e); + } + } + + private HistoryEventProto convertAppLaunchedEvent(AppLaunchedEvent event) { + HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getLaunchTime(), null, + event.getApplicationId(), null, null, null, null, event.getUser()); + // This is ok as long as we do not modify the underlying map. + @SuppressWarnings({ "unchecked", "rawtypes" }) + Map<String, Object> confMap = (Map)DAGUtils.convertConfigurationToATSMap(event.getConf()); + addEventData(builder, ATSConstants.CONFIG, confMap); + if (event.getVersion() != null) { + addEventData(builder, ATSConstants.TEZ_VERSION, + DAGUtils.convertTezVersionToATSMap(event.getVersion())); + } + addEventData(builder, ATSConstants.DAG_AM_WEB_SERVICE_VERSION, AMWebController.VERSION); + return builder.build(); + } + + private HistoryEventProto convertAMLaunchedEvent(AMLaunchedEvent event) { + HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getLaunchTime(), null, + null, event.getApplicationAttemptId(), null, null, null, event.getUser()); + addEventData(builder, ATSConstants.APP_SUBMIT_TIME, event.getAppSubmitTime()); + return builder.build(); + } + + private HistoryEventProto convertAMStartedEvent(AMStartedEvent event) { + HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getStartTime(), null, + null, event.getApplicationAttemptId(), null, null, null, event.getUser()); + return builder.build(); + } + + private HistoryEventProto convertContainerLaunchedEvent(ContainerLaunchedEvent event) { + HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getLaunchTime(), null, + null, event.getApplicationAttemptId(), null, null, null, null); + addEventData(builder, ATSConstants.CONTAINER_ID, event.getContainerId().toString()); + return builder.build(); + } + + private HistoryEventProto convertContainerStoppedEvent(ContainerStoppedEvent event) { + HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getStoppedTime(), null, + null, event.getApplicationAttemptId(), null, null, null, null); + addEventData(builder, ATSConstants.CONTAINER_ID, event.getContainerId().toString()); + addEventData(builder, ATSConstants.EXIT_STATUS, event.getExitStatus()); + addEventData(builder, ATSConstants.FINISH_TIME, event.getStoppedTime()); + return builder.build(); + } + + private HistoryEventProto convertDAGSubmittedEvent(DAGSubmittedEvent event) { + HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getSubmitTime(), + event.getDagID(), null, event.getApplicationAttemptId(), null, null, null, + event.getUser()); + addEventData(builder, ATSConstants.DAG_NAME, event.getDAGName()); + if (event.getDAGPlan().hasCallerContext() && + event.getDAGPlan().getCallerContext().hasCallerId()) { + CallerContextProto callerContext = event.getDagPlan().getCallerContext(); + addEventData(builder, ATSConstants.CALLER_CONTEXT_ID, callerContext.getCallerId()); + addEventData(builder, ATSConstants.CALLER_CONTEXT_TYPE, callerContext.getCallerType()); + addEventData(builder, ATSConstants.CALLER_CONTEXT, callerContext.getContext()); + } + if (event.getQueueName() != null) { + addEventData(builder, ATSConstants.DAG_QUEUE_NAME, event.getQueueName()); + } + addEventData(builder, ATSConstants.DAG_AM_WEB_SERVICE_VERSION, AMWebController.VERSION); + addEventData(builder, ATSConstants.IN_PROGRESS_LOGS_URL + "_" + + event.getApplicationAttemptId().getAttemptId(), event.getContainerLogs()); + try { + addEventData(builder, ATSConstants.DAG_PLAN, + DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan())); + } catch (IOException e) { + throw new TezUncheckedException(e); + } + return builder.build(); + } + + private HistoryEventProto convertDAGInitializedEvent(DAGInitializedEvent event) { + HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getInitTime(), + event.getDagID(), null, null, null, null, null, event.getUser()); + addEventData(builder, ATSConstants.DAG_NAME, event.getDagName()); + + if (event.getVertexNameIDMap() != null) { + Map<String, Object> nameIdStrMap = new TreeMap<String, Object>(); + for (Entry<String, TezVertexID> entry : event.getVertexNameIDMap().entrySet()) { + nameIdStrMap.put(entry.getKey(), entry.getValue().toString()); + } + addEventData(builder, ATSConstants.VERTEX_NAME_ID_MAPPING, nameIdStrMap); + } + return builder.build(); + } + + private HistoryEventProto convertDAGStartedEvent(DAGStartedEvent event) { + HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getStartTime(), + event.getDagID(), null, null, null, null, null, event.getUser()); + + addEventData(builder, ATSConstants.DAG_NAME, event.getDagName()); + addEventData(builder, ATSConstants.STATUS, event.getDagState().name()); + + return builder.build(); + } + + private HistoryEventProto convertDAGFinishedEvent(DAGFinishedEvent event) { + HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getFinishTime(), + event.getDagID(), null, event.getApplicationAttemptId(), null, null, null, + event.getUser()); + addEventData(builder, ATSConstants.DAG_NAME, event.getDagName()); + if (event.getDAGPlan().hasCallerContext()) { + if (event.getDAGPlan().getCallerContext().hasCallerType()) { + addEventData(builder, ATSConstants.CALLER_CONTEXT_TYPE, + event.getDAGPlan().getCallerContext().getCallerType()); + } + if (event.getDAGPlan().getCallerContext().hasCallerId()) { + addEventData(builder, ATSConstants.CALLER_CONTEXT_ID, + event.getDAGPlan().getCallerContext().getCallerId()); + } + } + addEventData(builder, ATSConstants.START_TIME, event.getStartTime()); + addEventData(builder, ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime())); + addEventData(builder, ATSConstants.STATUS, event.getState().name()); + addEventData(builder, ATSConstants.DIAGNOSTICS, event.getDiagnostics()); + addEventData(builder, ATSConstants.COMPLETION_APPLICATION_ATTEMPT_ID, + event.getApplicationAttemptId().toString()); + addEventData(builder, ATSConstants.COUNTERS, + DAGUtils.convertCountersToATSMap(event.getTezCounters())); + Map<String, Integer> dagTaskStats = event.getDagTaskStats(); + if (dagTaskStats != null) { + for (Entry<String, Integer> entry : dagTaskStats.entrySet()) { + addEventData(builder, entry.getKey(), entry.getValue()); + } + } + return builder.build(); + } + + private HistoryEventProto convertTaskAttemptStartedEvent(TaskAttemptStartedEvent event) { + HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getStartTime(), + null, null, null, null, null, event.getTaskAttemptID(), null); + if (event.getInProgressLogsUrl() != null) { + addEventData(builder, ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl()); + } + if (event.getCompletedLogsUrl() != null) { + addEventData(builder, ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl()); + } + addEventData(builder, ATSConstants.NODE_ID, event.getNodeId().toString()); + addEventData(builder, ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress()); + addEventData(builder, ATSConstants.CONTAINER_ID, event.getContainerId().toString()); + addEventData(builder, ATSConstants.STATUS, TaskAttemptState.RUNNING.name()); + + return builder.build(); + } + + private HistoryEventProto convertTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) { + HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getFinishTime(), + null, null, null, null, null, event.getTaskAttemptID(), null); + + addEventData(builder, ATSConstants.STATUS, event.getState().name()); + if (event.getTaskFailureType() != null) { + addEventData(builder, ATSConstants.TASK_FAILURE_TYPE, event.getTaskFailureType().name()); + } + + addEventData(builder, ATSConstants.CREATION_TIME, event.getCreationTime()); + addEventData(builder, ATSConstants.ALLOCATION_TIME, event.getAllocationTime()); + addEventData(builder, ATSConstants.START_TIME, event.getStartTime()); + if (event.getCreationCausalTA() != null) { + addEventData(builder, ATSConstants.CREATION_CAUSAL_ATTEMPT, + event.getCreationCausalTA().toString()); + } + addEventData(builder, ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime())); + if (event.getTaskAttemptError() != null) { + addEventData(builder, ATSConstants.TASK_ATTEMPT_ERROR_ENUM, + event.getTaskAttemptError().name()); + } + addEventData(builder, ATSConstants.DIAGNOSTICS, event.getDiagnostics()); + addEventData(builder, ATSConstants.COUNTERS, + DAGUtils.convertCountersToATSMap(event.getCounters())); + if (event.getDataEvents() != null && !event.getDataEvents().isEmpty()) { + addEventData(builder, ATSConstants.LAST_DATA_EVENTS, + DAGUtils.convertDataEventDependecyInfoToATS(event.getDataEvents())); + } + if (event.getNodeId() != null) { + addEventData(builder, ATSConstants.NODE_ID, event.getNodeId().toString()); + } + if (event.getContainerId() != null) { + addEventData(builder, ATSConstants.CONTAINER_ID, event.getContainerId().toString()); + } + if (event.getInProgressLogsUrl() != null) { + addEventData(builder, ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl()); + } + if (event.getCompletedLogsUrl() != null) { + addEventData(builder, ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl()); + } + if (event.getNodeHttpAddress() != null) { + addEventData(builder, ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress()); + } + + return builder.build(); + } + + private HistoryEventProto convertTaskFinishedEvent(TaskFinishedEvent event) { + HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getFinishTime(), + null, null, null, null, event.getTaskID(), null, null); + + addEventData(builder, ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime())); + addEventData(builder, ATSConstants.STATUS, event.getState().name()); + addEventData(builder, ATSConstants.NUM_FAILED_TASKS_ATTEMPTS, event.getNumFailedAttempts()); + if (event.getSuccessfulAttemptID() != null) { + addEventData(builder, ATSConstants.SUCCESSFUL_ATTEMPT_ID, + event.getSuccessfulAttemptID().toString()); + } + + addEventData(builder, ATSConstants.DIAGNOSTICS, event.getDiagnostics()); + addEventData(builder, ATSConstants.COUNTERS, + DAGUtils.convertCountersToATSMap(event.getTezCounters())); + + return builder.build(); + } + + private HistoryEventProto convertTaskStartedEvent(TaskStartedEvent event) { + HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getStartTime(), + null, null, null, null, event.getTaskID(), null, null); + + addEventData(builder, ATSConstants.SCHEDULED_TIME, event.getScheduledTime()); + addEventData(builder, ATSConstants.STATUS, event.getState().name()); + + return builder.build(); + } + + private HistoryEventProto convertVertexFinishedEvent(VertexFinishedEvent event) { + HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getFinishTime(), + null, null, null, event.getVertexID(), null, null, null); + + addEventData(builder, ATSConstants.STATUS, event.getState().name()); + addEventData(builder, ATSConstants.VERTEX_NAME, event.getVertexName()); + addEventData(builder, ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime())); + addEventData(builder, ATSConstants.DIAGNOSTICS, event.getDiagnostics()); + addEventData(builder, ATSConstants.COUNTERS, + DAGUtils.convertCountersToATSMap(event.getTezCounters())); + addEventData(builder, ATSConstants.STATS, + DAGUtils.convertVertexStatsToATSMap(event.getVertexStats())); + if (event.getServicePluginInfo() != null) { + addEventData(builder, ATSConstants.SERVICE_PLUGIN, + DAGUtils.convertServicePluginToATSMap(event.getServicePluginInfo())); + } + + final Map<String, Integer> vertexTaskStats = event.getVertexTaskStats(); + if (vertexTaskStats != null) { + for (Entry<String, Integer> entry : vertexTaskStats.entrySet()) { + addEventData(builder, entry.getKey(), entry.getValue()); + } + } + + return builder.build(); + } + + private HistoryEventProto convertVertexInitializedEvent(VertexInitializedEvent event) { + HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getInitedTime(), + null, null, null, event.getVertexID(), null, null, null); + addEventData(builder, ATSConstants.VERTEX_NAME, event.getVertexName()); + addEventData(builder, ATSConstants.INIT_REQUESTED_TIME, event.getInitRequestedTime()); + addEventData(builder, ATSConstants.NUM_TASKS, event.getNumTasks()); + addEventData(builder, ATSConstants.PROCESSOR_CLASS_NAME, event.getProcessorName()); + if (event.getServicePluginInfo() != null) { + addEventData(builder, ATSConstants.SERVICE_PLUGIN, + DAGUtils.convertServicePluginToATSMap(event.getServicePluginInfo())); + } + + return builder.build(); + } + + private HistoryEventProto convertVertexStartedEvent(VertexStartedEvent event) { + HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getStartTime(), + null, null, null, event.getVertexID(), null, null, null); + addEventData(builder, ATSConstants.START_REQUESTED_TIME, event.getStartRequestedTime()); + addEventData(builder, ATSConstants.STATUS, event.getVertexState().name()); + return builder.build(); + } + + private HistoryEventProto convertVertexReconfigureDoneEvent(VertexConfigurationDoneEvent event) { + HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getReconfigureDoneTime(), + null, null, null, event.getVertexID(), null, null, null); + if (event.getSourceEdgeProperties() != null && !event.getSourceEdgeProperties().isEmpty()) { + Map<String, Object> updatedEdgeManagers = new HashMap<String, Object>(); + for (Entry<String, EdgeProperty> entry : event.getSourceEdgeProperties().entrySet()) { + updatedEdgeManagers.put(entry.getKey(), DAGUtils.convertEdgeProperty(entry.getValue())); + } + addEventData(builder, ATSConstants.UPDATED_EDGE_MANAGERS, updatedEdgeManagers); + } + addEventData(builder, ATSConstants.NUM_TASKS, event.getNumTasks()); + return builder.build(); + } + + private HistoryEventProto convertDAGRecoveredEvent(DAGRecoveredEvent event) { + HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getRecoveredTime(), + event.getDagID(), null, event.getApplicationAttemptId(), null, null, null, + event.getUser()); + addEventData(builder, ATSConstants.DAG_NAME, event.getDagName()); + if (event.getRecoveredDagState() != null) { + addEventData(builder, ATSConstants.DAG_STATE, event.getRecoveredDagState().name()); + } + if (event.getRecoveryFailureReason() != null) { + addEventData(builder, ATSConstants.RECOVERY_FAILURE_REASON, + event.getRecoveryFailureReason()); + } + addEventData(builder, ATSConstants.IN_PROGRESS_LOGS_URL + "_" + + event.getApplicationAttemptId().getAttemptId(), event.getContainerLogs()); + return builder.build(); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java new file mode 100644 index 0000000..60cbda5 --- /dev/null +++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.history.logging.proto; + +import java.io.IOException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.history.DAGHistoryEvent; +import org.apache.tez.dag.history.HistoryEvent; +import org.apache.tez.dag.history.HistoryEventType; +import org.apache.tez.dag.history.events.DAGFinishedEvent; +import org.apache.tez.dag.history.logging.HistoryLoggingService; +import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.HistoryEventProto; +import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.ManifestEntryProto; +import org.apache.tez.dag.records.TezDAGID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Logging service to write history events serialized using protobuf into sequence files. + * This can be used as external tables in hive. Or the reader can be used independently to + * read the data from these files. + */ +public class ProtoHistoryLoggingService extends HistoryLoggingService { + private static final Logger LOG = LoggerFactory.getLogger(ProtoHistoryLoggingService.class); + private final HistoryEventProtoConverter converter = + new HistoryEventProtoConverter(); + private boolean loggingDisabled = false; + + private final LinkedBlockingQueue<DAGHistoryEvent> eventQueue = + new LinkedBlockingQueue<DAGHistoryEvent>(10000); + private Thread eventHandlingThread; + private final AtomicBoolean stopped = new AtomicBoolean(false); + + private TezProtoLoggers loggers; + private ProtoMessageWriter<HistoryEventProto> appEventsWriter; + private ProtoMessageWriter<HistoryEventProto> dagEventsWriter; + private TezDAGID currentDagId; + private long dagSubmittedEventOffset = -1; + + private String appEventsFile; + private long appLaunchedEventOffset; + + public ProtoHistoryLoggingService() { + super(ProtoHistoryLoggingService.class.getName()); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + LOG.info("Initing ProtoHistoryLoggingService"); + setConfig(conf); + loggingDisabled = !conf.getBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED, + TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT); + LOG.info("Inited ProtoHistoryLoggingService"); + } + + @Override + protected void serviceStart() throws Exception { + LOG.info("Starting ProtoHistoryLoggingService"); + if (!loggingDisabled) { + loggers = new TezProtoLoggers(); + if (!loggers.setup(getConfig(), appContext.getClock())) { + LOG.warn("Log file location for ProtoHistoryLoggingService not specified, " + + "logging disabled"); + loggingDisabled = true; + return; + } + appEventsWriter = loggers.getAppEventsLogger().getWriter( + appContext.getApplicationAttemptId().toString()); + eventHandlingThread = new Thread(this::loop, "HistoryEventHandlingThread"); + eventHandlingThread.start(); + } + LOG.info("Started ProtoHistoryLoggingService"); + } + + @Override + protected void serviceStop() throws Exception { + LOG.info("Stopping ProtoHistoryLoggingService, eventQueueBacklog=" + eventQueue.size()); + stopped.set(true); + eventHandlingThread.join(); + IOUtils.closeQuietly(appEventsWriter); + IOUtils.closeQuietly(dagEventsWriter); + LOG.info("Stopped ProtoHistoryLoggingService"); + } + + @Override + public void handle(DAGHistoryEvent event) { + if (loggingDisabled || stopped.get()) { + return; + } + try { + eventQueue.add(event); + } catch (IllegalStateException e) { + LOG.error("Queue capacity filled up, ignoring event: " + + event.getHistoryEvent().getEventType()); + if (LOG.isDebugEnabled()) { + LOG.debug("Queue capacity filled up, ignoring event: {}", event.getHistoryEvent()); + } + } + } + + private void loop() { + // Keep looping while the service is not stopped. + // Drain any left over events after the service has been stopped. + while (!stopped.get() || !eventQueue.isEmpty()) { + DAGHistoryEvent evt = null; + try { + evt = eventQueue.poll(100, TimeUnit.MILLISECONDS); + if (evt != null) { + handleEvent(evt); + } + } catch (InterruptedException e) { + LOG.info("EventQueue poll interrupted, ignoring it.", e); + } catch (IOException e) { + TezDAGID dagid = evt.getDagID(); + HistoryEventType type = evt.getHistoryEvent().getEventType(); + // Retry is hard, because there are several places where this exception can happen + // the state will get messed up a lot. + LOG.error("Got exception while handling event {} for dag {}.", type, dagid, e); + } + } + } + + private void handleEvent(DAGHistoryEvent event) throws IOException { + if (loggingDisabled) { + return; + } + HistoryEvent historyEvent = event.getHistoryEvent(); + if (event.getDagID() == null) { + if (historyEvent.getEventType() == HistoryEventType.APP_LAUNCHED) { + appEventsFile = appEventsWriter.getPath().toString(); + appLaunchedEventOffset = appEventsWriter.getOffset(); + } + appEventsWriter.writeProto(converter.convert(historyEvent)); + } else { + HistoryEventType type = historyEvent.getEventType(); + TezDAGID dagId = event.getDagID(); + if (type == HistoryEventType.DAG_FINISHED) { + finishCurrentDag((DAGFinishedEvent)historyEvent); + } else if (type == HistoryEventType.DAG_SUBMITTED) { + finishCurrentDag(null); + currentDagId = dagId; + dagEventsWriter = loggers.getDagEventsLogger().getWriter(dagId.toString()); + dagSubmittedEventOffset = dagEventsWriter.getOffset(); + dagEventsWriter.writeProto(converter.convert(historyEvent)); + } else if (dagEventsWriter != null) { + dagEventsWriter.writeProto(converter.convert(historyEvent)); + } + } + } + + private void finishCurrentDag(DAGFinishedEvent event) throws IOException { + if (dagEventsWriter == null) { + return; + } + ProtoMessageWriter<ManifestEntryProto> writer = null; + try { + long finishEventOffset = -1; + if (event != null) { + finishEventOffset = dagEventsWriter.getOffset(); + dagEventsWriter.writeProto(converter.convert(event)); + } + // Do not cache this writer, it should be created at the time of writing + writer = loggers.getManifestEventsLogger() + .getWriter(appContext.getApplicationAttemptId().toString()); + ManifestEntryProto.Builder entry = ManifestEntryProto.newBuilder() + .setDagId(currentDagId.toString()) + .setAppId(currentDagId.getApplicationId().toString()) + .setDagSubmittedEventOffset(dagSubmittedEventOffset) + .setDagFinishedEventOffset(finishEventOffset) + .setDagFilePath(dagEventsWriter.getPath().toString()) + .setAppFilePath(appEventsFile) + .setAppLaunchedEventOffset(appLaunchedEventOffset) + .setWriteTime(System.currentTimeMillis()); + if (event != null) { + entry.setDagId(event.getDagID().toString()); + } + writer.writeProto(entry.build()); + appEventsWriter.hflush(); + } finally { + // On an error, cleanup everything this will ensure, we do not use one dag's writer + // into another dag. + IOUtils.closeQuietly(dagEventsWriter); + IOUtils.closeQuietly(writer); + dagEventsWriter = null; + currentDagId = null; + dagSubmittedEventOffset = -1; + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java new file mode 100644 index 0000000..e5f5e6b --- /dev/null +++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.history.logging.proto; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; + +import com.google.protobuf.MessageLite; +import com.google.protobuf.Parser; + +public class ProtoMessageReader<T extends MessageLite> implements Closeable { + private final Path filePath; + private final SequenceFile.Reader reader; + private final ProtoMessageWritable<T> writable; + + ProtoMessageReader(Configuration conf, Path filePath, Parser<T> parser) throws IOException { + this.filePath = filePath; + this.reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(filePath)); + this.writable = new ProtoMessageWritable<>(parser); + } + + public Path getFilePath() { + return filePath; + } + + public void setOffset(long offset) throws IOException { + reader.seek(offset); + } + + public long getOffset() throws IOException { + return reader.getPosition(); + } + + public T readEvent() throws IOException { + if (!reader.next(NullWritable.get(), writable)) { + return null; + } + return writable.getMessage(); + } + + @Override + public void close() throws IOException { + reader.close(); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java new file mode 100644 index 0000000..34e4701 --- /dev/null +++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.history.logging.proto; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.io.Writable; + +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; +import com.google.protobuf.MessageLite; +import com.google.protobuf.Parser; + +public class ProtoMessageWritable<T extends MessageLite> implements Writable { + private T message; + private final Parser<T> parser; + private DataOutputStream dos; + private CodedOutputStream cos; + private DataInputStream din; + private CodedInputStream cin; + + ProtoMessageWritable(Parser<T> parser) { + this.parser = parser; + } + + public T getMessage() { + return message; + } + + public void setMessage(T message) { + this.message = message; + } + + private static class DataOutputStream extends OutputStream { + DataOutput out; + @Override + public void write(int b) throws IOException { + out.write(b); + } + + @Override + public void write(byte b[], int off, int len) throws IOException { + out.write(b, off, len); + } + } + + @Override + public void write(DataOutput out) throws IOException { + if (dos == null) { + dos = new DataOutputStream(); + cos = CodedOutputStream.newInstance(dos); + } + dos.out = out; + cos.writeMessageNoTag(message); + cos.flush(); + } + + private static class DataInputStream extends InputStream { + DataInput in; + @Override + public int read() throws IOException { + try { + return in.readUnsignedByte(); + } catch (EOFException e) { + return -1; + } + } + } + + @Override + public void readFields(DataInput in) throws IOException { + if (din == null) { + din = new DataInputStream(); + cin = CodedInputStream.newInstance(din); + cin.setSizeLimit(Integer.MAX_VALUE); + } + din.in = in; + message = cin.readMessage(parser, null); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java new file mode 100644 index 0000000..ca9ba61 --- /dev/null +++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.history.logging.proto; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.CompressionType; + +import com.google.protobuf.MessageLite; +import com.google.protobuf.Parser; + +public class ProtoMessageWriter<T extends MessageLite> implements Closeable { + private final Path filePath; + private final SequenceFile.Writer writer; + private final ProtoMessageWritable<T> writable; + + ProtoMessageWriter(Configuration conf, Path filePath, Parser<T> parser) throws IOException { + this.filePath = filePath; + this.writer = SequenceFile.createWriter( + conf, + SequenceFile.Writer.file(filePath), + SequenceFile.Writer.keyClass(NullWritable.class), + SequenceFile.Writer.valueClass(ProtoMessageWritable.class), + SequenceFile.Writer.appendIfExists(true), + SequenceFile.Writer.compression(CompressionType.RECORD)); + this.writable = new ProtoMessageWritable<>(parser); + } + + public Path getPath() { + return filePath; + } + + public long getOffset() throws IOException { + return writer.getLength(); + } + + public void writeProto(T message) throws IOException { + writable.setMessage(message); + writer.append(NullWritable.get(), writable); + } + + public void hflush() throws IOException { + writer.hflush(); + } + + @Override + public void close() throws IOException { + writer.close(); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/TezProtoLoggers.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/TezProtoLoggers.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/TezProtoLoggers.java new file mode 100644 index 0000000..44390fc --- /dev/null +++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/TezProtoLoggers.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.history.logging.proto; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.HistoryEventProto; +import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.ManifestEntryProto; + +/** + * Helper class to create the logger for tez, we would use this to read the events outside + * tez and hence isolating the configuration and the paths in this. + */ +public class TezProtoLoggers { + private DatePartitionedLogger<HistoryEventProto> appEventsLogger; + private DatePartitionedLogger<HistoryEventProto> dagEventsLogger; + private DatePartitionedLogger<ManifestEntryProto> manifestEventsLogger; + + public boolean setup(Configuration conf, Clock clock) throws IOException { + String logDir = conf.get(TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_BASE_DIR); + if (logDir == null) { + return false; + } + appEventsLogger = new DatePartitionedLogger<>(HistoryEventProto.PARSER, + new Path(logDir, "app_data"), conf, clock); + dagEventsLogger = new DatePartitionedLogger<>(HistoryEventProto.PARSER, + new Path(logDir, "dag_data"), conf, clock); + manifestEventsLogger = new DatePartitionedLogger<>(ManifestEntryProto.PARSER, + new Path(logDir, "dag_meta"), conf, clock); + return true; + } + + public DatePartitionedLogger<HistoryEventProto> getAppEventsLogger() { + return appEventsLogger; + } + + public DatePartitionedLogger<HistoryEventProto> getDagEventsLogger() { + return dagEventsLogger; + } + + public DatePartitionedLogger<ManifestEntryProto> getManifestEventsLogger() { + return manifestEventsLogger; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-plugins/tez-protobuf-history-plugin/src/main/proto/HistoryLogger.proto ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/proto/HistoryLogger.proto b/tez-plugins/tez-protobuf-history-plugin/src/main/proto/HistoryLogger.proto new file mode 100644 index 0000000..a5bbe34 --- /dev/null +++ b/tez-plugins/tez-protobuf-history-plugin/src/main/proto/HistoryLogger.proto @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.tez.dag.history.logging.proto"; +option java_outer_classname = "HistoryLoggerProtos"; + +message KVPair { + optional string key = 1; + optional string value = 2; +} + +message HistoryEventProto { + optional string event_type = 1; + optional int64 event_time = 2; + optional string user = 3; + optional string app_id = 4; + optional string app_attempt_id = 5; + optional string dag_id = 6; + optional string vertex_id = 7; + optional string task_id = 8; + optional string task_attempt_id = 9; + repeated KVPair event_data = 10; +} + +message ManifestEntryProto { + optional string dag_id = 1; + optional string app_id = 2; + optional int64 dag_submitted_event_offset = 3; + optional int64 dag_finished_event_offset = 4; + optional string dag_file_path = 5; + optional int64 writeTime = 6; + optional string app_file_path = 7; + optional int64 app_launched_event_offset = 8; +} http://git-wip-us.apache.org/repos/asf/tez/blob/24b872a7/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestDagManifestFileScanner.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestDagManifestFileScanner.java b/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestDagManifestFileScanner.java new file mode 100644 index 0000000..fcaa315 --- /dev/null +++ b/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestDagManifestFileScanner.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.dag.history.logging.proto; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.ManifestEntryProto; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestDagManifestFileScanner { + private MockClock clock; + private DatePartitionedLogger<ManifestEntryProto> manifestLogger; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void setupTest() throws Exception { + String basePath = tempFolder.newFolder().getAbsolutePath(); + clock = new MockClock(); + Configuration conf = new Configuration(false); + conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_BASE_DIR, basePath); + TezProtoLoggers loggers = new TezProtoLoggers(); + loggers.setup(conf, clock); + manifestLogger = loggers.getManifestEventsLogger(); + } + + @Test + public void testNormal() throws Exception { + clock.setTime(0); // 0th day. + createManifestEvents(0, 8); + clock.setTime((24 * 60 * 60 + 1) * 1000); // 1 day 1 sec. + createManifestEvents(24 * 3600, 5); + DagManifesFileScanner scanner = new DagManifesFileScanner(manifestLogger); + int count = 0; + while (scanner.getNext() != null) { + ++count; + } + Assert.assertEquals(8, count); + + // Save offset for later use. + String offset = scanner.getOffset(); + + // Move time outside the window, no changes and it will give more events. + clock.setTime((24 * 60 * 60 + 61) * 1000); // 1 day 61 sec. + count = 0; + while (scanner.getNext() != null) { + ++count; + } + Assert.assertEquals(5, count); + + // Reset the offset + scanner.setOffset(offset); + count = 0; + while (scanner.getNext() != null) { + ++count; + } + Assert.assertEquals(5, count); + + scanner.close(); + + // Not able to test append since the LocalFileSystem does not implement append. + } + + private void createManifestEvents(long time, int numEvents) throws IOException { + for (int i = 0; i < numEvents; ++i) { + ApplicationId appId = ApplicationId.newInstance(1000l, i); + ManifestEntryProto proto = ManifestEntryProto.newBuilder() + .setAppId(appId.toString()) + .setDagFilePath("dummy_dag_path_" + i) + .setDagSubmittedEventOffset(0) + .setDagFinishedEventOffset(1) + .setAppFilePath("dummp_app_path_" + i) + .setAppLaunchedEventOffset(2) + .setWriteTime(clock.getTime()) + .build(); + ProtoMessageWriter<ManifestEntryProto> writer = manifestLogger.getWriter(appId.toString()); + writer.writeProto(proto); + writer.close(); + } + } + + private static class MockClock implements Clock { + private long time = 0; + + void setTime(long time) { + this.time = time; + } + + @Override + public long getTime() { + return time; + } + } +}
