EAGLE-276 eagle support for mr & spark history job monitoring mr & spark job history monitoring
Author: @wujinhu <[email protected]> Reviewer: @yonzhang <[email protected]> Closes: #217 Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/fe509125 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/fe509125 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/fe509125 Branch: refs/heads/develop Commit: fe50912574dfe2126da0154b8544d81022126acc Parents: f857cbe Author: yonzhang <[email protected]> Authored: Tue Jul 5 11:10:45 2016 -0700 Committer: yonzhang <[email protected]> Committed: Tue Jul 5 11:10:45 2016 -0700 ---------------------------------------------------------------------- .../apache/eagle/common/SerializableUtils.java | 126 ++++ .../entity/meta/DefaultJavaObjctSerDeser.java | 41 ++ .../entity/meta/EntityDefinitionManager.java | 21 +- eagle-jpm/eagle-jpa-spark-history/pom.xml | 66 ++ eagle-jpm/eagle-jpa-spark-running/pom.xml | 66 ++ eagle-jpm/eagle-jpm-entity/pom.xml | 52 ++ .../eagle/jpm/entity/JPMEntityRepository.java | 30 + .../org/apache/eagle/jpm/entity/JobConfig.java | 38 + .../org/apache/eagle/jpm/entity/SparkApp.java | 428 ++++++++++++ .../apache/eagle/jpm/entity/SparkExecutor.java | 233 +++++++ .../org/apache/eagle/jpm/entity/SparkJob.java | 178 +++++ .../org/apache/eagle/jpm/entity/SparkStage.java | 299 ++++++++ .../org/apache/eagle/jpm/entity/SparkTask.java | 290 ++++++++ eagle-jpm/eagle-jpm-mr-history/pom.xml | 138 ++++ .../assembly/eagle-jpm-mr-history-assembly.xml | 65 ++ .../eagle/jpm/mr/history/MRHistoryJobMain.java | 87 +++ .../jpm/mr/history/common/JHFConfigManager.java | 182 +++++ .../jpm/mr/history/common/JPAConstants.java | 95 +++ .../eagle/jpm/mr/history/common/JobConfig.java | 38 + .../history/crawler/AbstractJobHistoryDAO.java | 194 +++++ .../crawler/DefaultJHFInputStreamCallback.java | 66 ++ .../mr/history/crawler/JHFCrawlerDriver.java | 27 + .../history/crawler/JHFCrawlerDriverImpl.java | 277 ++++++++ .../history/crawler/JHFInputStreamCallback.java | 37 + .../crawler/JobHistoryContentFilter.java | 36 + .../crawler/JobHistoryContentFilterBuilder.java | 91 +++ .../crawler/JobHistoryContentFilterImpl.java | 94 +++ .../mr/history/crawler/JobHistoryDAOImpl.java | 203 ++++++ .../jpm/mr/history/crawler/JobHistoryLCM.java | 86 +++ .../JobHistorySpoutCollectorInterceptor.java | 36 + .../history/entities/JPAEntityRepository.java | 40 ++ .../mr/history/entities/JobBaseAPIEntity.java | 24 + .../mr/history/entities/JobConfigSerDeser.java | 63 ++ .../entities/JobConfigurationAPIEntity.java | 67 ++ .../history/entities/JobCountersSerDeser.java | 166 +++++ .../mr/history/entities/JobEventAPIEntity.java | 44 ++ .../history/entities/JobExecutionAPIEntity.java | 132 ++++ .../entities/JobProcessTimeStampEntity.java | 44 ++ .../entities/TaskAttemptCounterAPIEntity.java | 61 ++ .../entities/TaskAttemptExecutionAPIEntity.java | 101 +++ .../entities/TaskExecutionAPIEntity.java | 89 +++ .../entities/TaskFailureCountAPIEntity.java | 67 ++ .../jobcounter/CounterGroupDictionary.java | 238 +++++++ .../mr/history/jobcounter/CounterGroupKey.java | 32 + .../jpm/mr/history/jobcounter/CounterKey.java | 30 + .../history/jobcounter/JobCounterException.java | 63 ++ .../jpm/mr/history/jobcounter/JobCounters.java | 47 ++ .../jpm/mr/history/parser/EagleJobStatus.java | 28 + .../jpm/mr/history/parser/EagleJobTagName.java | 48 ++ .../jpm/mr/history/parser/EagleTaskStatus.java | 25 + .../HistoryJobEntityCreationListener.java | 39 ++ .../HistoryJobEntityLifecycleListener.java | 34 + .../jpm/mr/history/parser/ImportException.java | 33 + .../mr/history/parser/JHFEventReaderBase.java | 405 +++++++++++ .../eagle/jpm/mr/history/parser/JHFFormat.java | 24 + .../mr/history/parser/JHFMRVer1EventReader.java | 150 ++++ .../jpm/mr/history/parser/JHFMRVer1Parser.java | 271 +++++++ .../parser/JHFMRVer1PerLineListener.java | 39 ++ .../mr/history/parser/JHFMRVer2EventReader.java | 380 ++++++++++ .../jpm/mr/history/parser/JHFMRVer2Parser.java | 87 +++ .../jpm/mr/history/parser/JHFParserBase.java | 35 + .../jpm/mr/history/parser/JHFParserFactory.java | 71 ++ .../parser/JHFWriteNotCompletedException.java | 36 + ...JobConfigurationCreationServiceListener.java | 92 +++ .../JobEntityCreationEagleServiceListener.java | 127 ++++ .../parser/JobEntityCreationPublisher.java | 47 ++ .../parser/JobEntityLifecycleAggregator.java | 176 +++++ .../mr/history/parser/MRErrorClassifier.java | 112 +++ .../jpm/mr/history/parser/RecordTypes.java | 26 + .../parser/TaskAttemptCounterListener.java | 152 ++++ .../mr/history/parser/TaskFailureListener.java | 137 ++++ .../history/storm/DefaultJobIdPartitioner.java | 28 + .../history/storm/HistoryJobProgressBolt.java | 132 ++++ .../jpm/mr/history/storm/JobHistorySpout.java | 208 ++++++ .../eagle/jpm/mr/history/storm/JobIdFilter.java | 23 + .../history/storm/JobIdFilterByPartition.java | 40 ++ .../jpm/mr/history/storm/JobIdPartitioner.java | 23 + .../mr/history/zkres/JobHistoryZKStateLCM.java | 31 + .../history/zkres/JobHistoryZKStateManager.java | 305 ++++++++ .../src/main/resources/JobCounter.conf | 185 +++++ .../services/org.apache.hadoop.fs.FileSystem | 20 + .../src/main/resources/MRErrorCategory.config | 41 ++ .../src/main/resources/application.conf | 85 +++ .../src/main/resources/core-site.xml | 497 +++++++++++++ .../src/main/resources/hdfs-site.xml | 449 ++++++++++++ .../src/main/resources/log4j.properties | 34 + eagle-jpm/eagle-jpm-spark-history/pom.xml | 122 ++++ .../eagle-jpm-spark-history-assembly.xml | 65 ++ .../history/config/SparkHistoryCrawlConfig.java | 122 ++++ .../jpm/spark/history/crawl/EventType.java | 24 + .../history/crawl/JHFInputStreamReader.java | 25 + .../jpm/spark/history/crawl/JHFParserBase.java | 29 + .../history/crawl/JHFSparkEventReader.java | 699 +++++++++++++++++++ .../jpm/spark/history/crawl/JHFSparkParser.java | 63 ++ .../history/crawl/SparkApplicationInfo.java | 69 ++ .../SparkHistoryFileInputStreamReaderImpl.java | 53 ++ .../status/JobHistoryZKStateManager.java | 262 +++++++ .../spark/history/status/ZKStateConstant.java | 27 + .../history/storm/FinishedSparkJobSpout.java | 152 ++++ .../history/storm/SparkHistoryTopology.java | 81 +++ .../spark/history/storm/SparkJobParseBolt.java | 178 +++++ .../eagle/jpm/spark/history/storm/TestHDFS.java | 47 ++ .../services/org.apache.hadoop.fs.FileSystem | 20 + .../src/main/resources/application.conf | 77 ++ .../src/main/resources/log4j.properties | 35 + eagle-jpm/eagle-jpm-spark-running/pom.xml | 66 ++ eagle-jpm/eagle-jpm-util/pom.xml | 65 ++ .../org/apache/eagle/jpm/util/Constants.java | 49 ++ .../org/apache/eagle/jpm/util/HDFSUtil.java | 44 ++ .../org/apache/eagle/jpm/util/JSONUtil.java | 66 ++ .../eagle/jpm/util/JobNameNormalization.java | 118 ++++ .../eagle/jpm/util/SparkEntityConstant.java | 29 + .../apache/eagle/jpm/util/SparkJobTagName.java | 44 ++ .../util/resourceFetch/RMResourceFetcher.java | 98 +++ .../jpm/util/resourceFetch/ResourceFetcher.java | 27 + .../SparkHistoryServerResourceFetcher.java | 81 +++ .../connection/InputStreamUtils.java | 69 ++ .../util/resourceFetch/connection/JobUtils.java | 43 ++ .../connection/URLConnectionUtils.java | 102 +++ .../util/resourceFetch/ha/HAURLSelector.java | 28 + .../resourceFetch/ha/HAURLSelectorImpl.java | 101 +++ .../jpm/util/resourceFetch/model/AppInfo.java | 146 ++++ .../util/resourceFetch/model/Applications.java | 38 + .../util/resourceFetch/model/AppsWrapper.java | 36 + .../resourceFetch/model/SparkApplication.java | 57 ++ .../model/SparkApplicationAttempt.java | 73 ++ .../model/SparkApplicationWrapper.java | 38 + .../url/JobListServiceURLBuilderImpl.java | 37 + .../resourceFetch/url/ServiceURLBuilder.java | 21 + .../SparkCompleteJobServiceURLBuilderImpl.java | 29 + .../url/SparkJobServiceURLBuilderImpl.java | 29 + .../src/main/resources/application.properties | 23 + eagle-jpm/pom.xml | 54 ++ 133 files changed, 13354 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-core/eagle-query/eagle-common/src/main/java/org/apache/eagle/common/SerializableUtils.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-common/src/main/java/org/apache/eagle/common/SerializableUtils.java b/eagle-core/eagle-query/eagle-common/src/main/java/org/apache/eagle/common/SerializableUtils.java new file mode 100644 index 0000000..c5823ea --- /dev/null +++ b/eagle-core/eagle-query/eagle-common/src/main/java/org/apache/eagle/common/SerializableUtils.java @@ -0,0 +1,126 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.eagle.common; + +import org.xerial.snappy.SnappyInputStream; +import org.xerial.snappy.SnappyOutputStream; + +import java.io.*; + +/** + * Utilities for working with Serializables. + * + * Derived from "com.google.cloud.dataflow.sdk.util.SerializableUtils": + * https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/SerializableUtils.java + */ +public class SerializableUtils { + /** + * Serializes the argument into an array of bytes, and returns it. + * + * @throws IllegalArgumentException if there are errors when serializing + */ + public static byte[] serializeToCompressedByteArray(Object value) { + try { + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + try (ObjectOutputStream oos = new ObjectOutputStream(new SnappyOutputStream(buffer))) { + oos.writeObject(value); + } + return buffer.toByteArray(); + } catch (IOException exn) { + throw new IllegalArgumentException( + "unable to serialize " + value, + exn); + } + } + + /** + * Serializes the argument into an array of bytes, and returns it. + * + * @throws IllegalArgumentException if there are errors when serializing + */ + public static byte[] serializeToByteArray(Object value) { + try { + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + try (ObjectOutputStream oos = new ObjectOutputStream(buffer)) { + oos.writeObject(value); + } + return buffer.toByteArray(); + } catch (IOException exn) { + throw new IllegalArgumentException("unable to serialize " + value, exn); + } + } + + /** + * Deserializes an object from the given array of bytes, e.g., as + * serialized using {@link #serializeToCompressedByteArray}, and returns it. + * + * @throws IllegalArgumentException if there are errors when + * deserializing, using the provided description to identify what + * was being deserialized + */ + public static Object deserializeFromByteArray(byte[] encodedValue, + String description) { + try { + try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(encodedValue))) { + return ois.readObject(); + } + } catch (IOException | ClassNotFoundException exn) { + throw new IllegalArgumentException( + "unable to deserialize " + description, + exn); + } + } + + /** + * Deserializes an object from the given array of bytes, e.g., as + * serialized using {@link #serializeToCompressedByteArray}, and returns it. + * + * @throws IllegalArgumentException if there are errors when + * deserializing, using the provided description to identify what + * was being deserialized + */ + public static Object deserializeFromCompressedByteArray(byte[] encodedValue, + String description) { + try { + try (ObjectInputStream ois = new ObjectInputStream( + new SnappyInputStream(new ByteArrayInputStream(encodedValue)))) { + return ois.readObject(); + } + } catch (IOException | ClassNotFoundException exn) { + throw new IllegalArgumentException( + "unable to deserialize " + description, + exn); + } + } + + public static <T extends Serializable> T ensureSerializable(T value) { + @SuppressWarnings("unchecked") + T copy = (T) deserializeFromCompressedByteArray(serializeToCompressedByteArray(value), + value.toString()); + return copy; + } + + public static <T extends Serializable> T clone(T value) { + @SuppressWarnings("unchecked") + T copy = (T) deserializeFromCompressedByteArray(serializeToCompressedByteArray(value), + value.toString()); + return copy; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DefaultJavaObjctSerDeser.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DefaultJavaObjctSerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DefaultJavaObjctSerDeser.java new file mode 100644 index 0000000..24385a9 --- /dev/null +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DefaultJavaObjctSerDeser.java @@ -0,0 +1,41 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.eagle.log.entity.meta; + +import org.apache.eagle.common.SerializableUtils; + +public class DefaultJavaObjctSerDeser implements EntitySerDeser<Object> { + public final static EntitySerDeser<Object> INSTANCE = new DefaultJavaObjctSerDeser(); + + @Override + public Object deserialize(byte[] bytes) { + return SerializableUtils.deserializeFromByteArray(bytes,"Deserialize from java object bytes"); + } + + @Override + public byte[] serialize(Object o) { + return SerializableUtils.serializeToByteArray(o); + } + + @Override + public Class<Object> type() { + return Object.class; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java index d990fb5..7b1010d 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java @@ -16,12 +16,6 @@ */ package org.apache.eagle.log.entity.meta; -import java.lang.reflect.Method; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - import org.apache.eagle.common.config.EagleConfigFactory; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.eagle.log.entity.repo.EntityRepositoryScanner; @@ -31,6 +25,12 @@ import org.mockito.cglib.core.Predicate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + /** * static initialization of all registered entities. As of now, dynamic registration is not supported */ @@ -365,11 +365,12 @@ public class EntityDefinitionManager { q.setQualifierName(column.value()); EntitySerDeser<?> serDeser = _serDeserMap.get(fldCls); if(serDeser == null){ - throw new IllegalArgumentException(fldCls.getName() + " in field " + f.getName() + - " of entity " + cls.getSimpleName() + " has no serializer associated "); - } else { - q.setSerDeser((EntitySerDeser<Object>)serDeser); +// throw new IllegalArgumentException(fldCls.getName() + " in field " + f.getName() + +// " of entity " + cls.getSimpleName() + " has no serializer associated "); + serDeser = DefaultJavaObjctSerDeser.INSTANCE; } + + q.setSerDeser((EntitySerDeser<Object>)serDeser); ed.getQualifierNameMap().put(q.getQualifierName(), q); ed.getDisplayNameMap().put(q.getDisplayName(), q); // TODO: should refine rules, consider fields like "hCol", getter method should be gethCol() according to org.apache.commons.beanutils.PropertyUtils http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpa-spark-history/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpa-spark-history/pom.xml b/eagle-jpm/eagle-jpa-spark-history/pom.xml new file mode 100644 index 0000000..cc293b6 --- /dev/null +++ b/eagle-jpm/eagle-jpa-spark-history/pom.xml @@ -0,0 +1,66 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-jpm-parent</artifactId> + <version>0.3.0-incubating</version> + <relativePath>../pom.xml</relativePath> + </parent> + <artifactId>eagle-jpm-spark-history</artifactId> + <name>eagle-jpm-spark-history</name> + <url>http://maven.apache.org</url> + <dependencies> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-stream-process-api</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-stream-process-base</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-job-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.jsoup</groupId> + <artifactId>jsoup</artifactId> + </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <exclusions> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpa-spark-running/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpa-spark-running/pom.xml b/eagle-jpm/eagle-jpa-spark-running/pom.xml new file mode 100644 index 0000000..42c476a --- /dev/null +++ b/eagle-jpm/eagle-jpa-spark-running/pom.xml @@ -0,0 +1,66 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-jpm-parent</artifactId> + <version>0.3.0-incubating</version> + <relativePath>../pom.xml</relativePath> + </parent> + <artifactId>eagle-jpm-spark-running</artifactId> + <name>eagle-jpm-spark-running</name> + <url>http://maven.apache.org</url> + <dependencies> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-stream-process-api</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-stream-process-base</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-job-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.jsoup</groupId> + <artifactId>jsoup</artifactId> + </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <exclusions> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-entity/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/pom.xml b/eagle-jpm/eagle-jpm-entity/pom.xml new file mode 100644 index 0000000..29be4ab --- /dev/null +++ b/eagle-jpm/eagle-jpm-entity/pom.xml @@ -0,0 +1,52 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>eagle-jpm-parent</artifactId> + <groupId>org.apache.eagle</groupId> + <version>0.5.0-incubating-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>eagle-jpm-entity</artifactId> + <packaging>jar</packaging> + + <name>eagle-jpm-entity</name> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-jpm-util</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>3.8.1</version> + <scope>test</scope> + </dependency> + + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/JPMEntityRepository.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/JPMEntityRepository.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/JPMEntityRepository.java new file mode 100644 index 0000000..f54688b --- /dev/null +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/JPMEntityRepository.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.entity; + +import org.apache.eagle.log.entity.repo.EntityRepository; + +public class JPMEntityRepository extends EntityRepository { + public JPMEntityRepository() { + entitySet.add(SparkApp.class); + entitySet.add(SparkJob.class); + entitySet.add(SparkStage.class); + entitySet.add(SparkTask.class); + entitySet.add(SparkExecutor.class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/JobConfig.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/JobConfig.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/JobConfig.java new file mode 100644 index 0000000..de3bd7a --- /dev/null +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/JobConfig.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.entity; + +import java.io.Serializable; +import java.util.Map; +import java.util.TreeMap; + +public class JobConfig implements Serializable { + private Map<String, String> config = new TreeMap<>(); + + public Map<String, String> getConfig() { + return config; + } + + public void setConfig(Map<String, String> config) { + this.config = config; + } + @Override + public String toString(){ + return config.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkApp.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkApp.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkApp.java new file mode 100644 index 0000000..1760753 --- /dev/null +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkApp.java @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.entity; + +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.log.entity.meta.*; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.apache.eagle.jpm.util.Constants; + +@Table("eglesprk_apps") +@ColumnFamily("f") +@Prefix("sprkapp") +@Service(Constants.SPARK_APP_SERVICE_ENDPOINT_NAME) +@JsonIgnoreProperties(ignoreUnknown = true) +@TimeSeries(true) +@Tags({"site","sprkAppId", "sprkAppAttemptId", "sprkAppName", "normSprkAppName","user", "queue"}) +@Partition({"site"}) +public class SparkApp extends TaggedLogAPIEntity{ + + @Column("a") + private long startTime; + @Column("b") + private long endTime; + @Column("c") + private String yarnState; + @Column("d") + private String yarnStatus; + @Column("e") + private JobConfig config; + @Column("f") + private int numJobs; + @Column("g") + private int totalStages; + @Column("h") + private int skippedStages; + @Column("i") + private int failedStages; + @Column("j") + private int totalTasks; + @Column("k") + private int skippedTasks; + @Column("l") + private int failedTasks; + @Column("m") + private int executors; + @Column("n") + private long inputBytes; + @Column("o") + private long inputRecords; + @Column("p") + private long outputBytes; + @Column("q") + private long outputRecords; + @Column("r") + private long shuffleReadBytes; + @Column("s") + private long shuffleReadRecords; + @Column("t") + private long shuffleWriteBytes; + @Column("u") + private long shuffleWriteRecords; + @Column("v") + private long executorDeserializeTime; + @Column("w") + private long executorRunTime; + @Column("x") + private long resultSize; + @Column("y") + private long jvmGcTime; + @Column("z") + private long resultSerializationTime; + @Column("ab") + private long memoryBytesSpilled; + @Column("ac") + private long diskBytesSpilled; + @Column("ad") + private long execMemoryBytes; + @Column("ae") + private long driveMemoryBytes; + @Column("af") + private int completeTasks; + @Column("ag") + private long totalExecutorTime; + @Column("ah") + private long executorMemoryOverhead; + @Column("ai") + private long driverMemoryOverhead; + @Column("aj") + private int executorCores; + @Column("ak") + private int driverCores; + + public long getStartTime() { + return startTime; + } + + public long getEndTime() { + return endTime; + } + + public String getYarnState() { + return yarnState; + } + + public String getYarnStatus() { + return yarnStatus; + } + + public int getNumJobs() { + return numJobs; + } + + public int getTotalStages() { + return totalStages; + } + + public int getSkippedStages() { + return skippedStages; + } + + public int getFailedStages() { + return failedStages; + } + + public int getTotalTasks() { + return totalTasks; + } + + public int getSkippedTasks() { + return skippedTasks; + } + + public int getFailedTasks() { + return failedTasks; + } + + public int getExecutors() { + return executors; + } + + public long getInputBytes() { + return inputBytes; + } + + public long getInputRecords() { + return inputRecords; + } + + public long getOutputBytes() { + return outputBytes; + } + + public long getOutputRecords() { + return outputRecords; + } + + public long getShuffleReadBytes() { + return shuffleReadBytes; + } + + public long getShuffleReadRecords() { + return shuffleReadRecords; + } + + public long getShuffleWriteBytes() { + return shuffleWriteBytes; + } + + public long getShuffleWriteRecords() { + return shuffleWriteRecords; + } + + public long getExecutorDeserializeTime() { + return executorDeserializeTime; + } + + public long getExecutorRunTime() { + return executorRunTime; + } + + public long getResultSize() { + return resultSize; + } + + public long getJvmGcTime() { + return jvmGcTime; + } + + public long getResultSerializationTime() { + return resultSerializationTime; + } + + public long getMemoryBytesSpilled() { + return memoryBytesSpilled; + } + + public long getDiskBytesSpilled() { + return diskBytesSpilled; + } + + public long getExecMemoryBytes() { + return execMemoryBytes; + } + + public long getDriveMemoryBytes() { + return driveMemoryBytes; + } + + public int getCompleteTasks(){ return completeTasks;} + + public JobConfig getConfig() { + return config; + } + public void setStartTime(long startTime) { + this.startTime = startTime; + valueChanged("startTime"); + } + + public void setEndTime(long endTime) { + this.endTime = endTime; + valueChanged("endTime"); + } + + public void setYarnState(String yarnState) { + this.yarnState = yarnState; + valueChanged("yarnState"); + } + + public void setYarnStatus(String yarnStatus) { + this.yarnStatus = yarnStatus; + valueChanged("yarnStatus"); + } + + public void setConfig(JobConfig config) { + this.config = config; + valueChanged("config"); + } + + public void setNumJobs(int numJobs) { + this.numJobs = numJobs; + valueChanged("numJobs"); + } + + public void setTotalStages(int totalStages) { + this.totalStages = totalStages; + valueChanged("totalStages"); + } + + public void setSkippedStages(int skippedStages) { + this.skippedStages = skippedStages; + valueChanged("skippedStages"); + } + + public void setFailedStages(int failedStages) { + this.failedStages = failedStages; + valueChanged("failedStages"); + } + + public void setTotalTasks(int totalTasks) { + this.totalTasks = totalTasks; + valueChanged("totalTasks"); + } + + public void setSkippedTasks(int skippedTasks) { + this.skippedTasks = skippedTasks; + valueChanged("skippedTasks"); + } + + public void setFailedTasks(int failedTasks) { + this.failedTasks = failedTasks; + valueChanged("failedTasks"); + } + + public void setExecutors(int executors) { + this.executors = executors; + valueChanged("executors"); + } + + public void setInputBytes(long inputBytes) { + this.inputBytes = inputBytes; + valueChanged("inputBytes"); + } + + public void setInputRecords(long inputRecords) { + this.inputRecords = inputRecords; + valueChanged("inputRecords"); + } + + public void setOutputBytes(long outputBytes) { + this.outputBytes = outputBytes; + valueChanged("outputBytes"); + } + + public void setOutputRecords(long outputRecords) { + this.outputRecords = outputRecords; + valueChanged("outputRecords"); + } + + public void setShuffleReadBytes(long shuffleReadRemoteBytes) { + this.shuffleReadBytes = shuffleReadRemoteBytes; + valueChanged("shuffleReadBytes"); + } + + public void setShuffleReadRecords(long shuffleReadRecords) { + this.shuffleReadRecords = shuffleReadRecords; + valueChanged("shuffleReadRecords"); + } + + public void setShuffleWriteBytes(long shuffleWriteBytes) { + this.shuffleWriteBytes = shuffleWriteBytes; + valueChanged("shuffleWriteBytes"); + } + + public void setShuffleWriteRecords(long shuffleWriteRecords) { + this.shuffleWriteRecords = shuffleWriteRecords; + valueChanged("shuffleWriteRecords"); + } + + public void setExecutorDeserializeTime(long executorDeserializeTime) { + this.executorDeserializeTime = executorDeserializeTime; + valueChanged("executorDeserializeTime"); + } + + public void setExecutorRunTime(long executorRunTime) { + this.executorRunTime = executorRunTime; + valueChanged("executorRunTime"); + } + + public void setResultSize(long resultSize) { + this.resultSize = resultSize; + valueChanged("resultSize"); + } + + public void setJvmGcTime(long jvmGcTime) { + this.jvmGcTime = jvmGcTime; + valueChanged("jvmGcTime"); + } + + public void setResultSerializationTime(long resultSerializationTime) { + this.resultSerializationTime = resultSerializationTime; + valueChanged("resultSerializationTime"); + } + + public void setMemoryBytesSpilled(long memoryBytesSpilled) { + this.memoryBytesSpilled = memoryBytesSpilled; + valueChanged("memoryBytesSpilled"); + } + + public void setDiskBytesSpilled(long diskBytesSpilled) { + this.diskBytesSpilled = diskBytesSpilled; + valueChanged("diskBytesSpilled"); + } + + public void setExecMemoryBytes(long execMemoryBytes) { + this.execMemoryBytes = execMemoryBytes; + valueChanged("execMemoryBytes"); + } + + public void setDriveMemoryBytes(long driveMemoryBytes) { + this.driveMemoryBytes = driveMemoryBytes; + valueChanged("driveMemoryBytes"); + } + + public void setCompleteTasks(int completeTasks){ + this.completeTasks = completeTasks; + valueChanged("completeTasks"); + } + + public long getTotalExecutorTime() { + return totalExecutorTime; + } + + public void setTotalExecutorTime(long totalExecutorTime) { + this.totalExecutorTime = totalExecutorTime; + valueChanged("totalExecutorTime"); + } + + public long getExecutorMemoryOverhead() { + return executorMemoryOverhead; + } + + public void setExecutorMemoryOverhead(long executorMemoryOverhead) { + this.executorMemoryOverhead = executorMemoryOverhead; + valueChanged("executorMemoryOverhead"); + } + + public long getDriverMemoryOverhead() { + return driverMemoryOverhead; + } + + public void setDriverMemoryOverhead(long driverMemoryOverhead) { + this.driverMemoryOverhead = driverMemoryOverhead; + valueChanged("driverMemoryOverhead"); + } + + public int getExecutorCores() { + return executorCores; + } + + public void setExecutorCores(int executorCores) { + this.executorCores = executorCores; + valueChanged("executorCores"); + } + + public int getDriverCores() { + return driverCores; + } + + public void setDriverCores(int driverCores) { + this.driverCores = driverCores; + valueChanged("driverCores"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkExecutor.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkExecutor.java new file mode 100644 index 0000000..92cb130 --- /dev/null +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkExecutor.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.entity; + +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.log.entity.meta.*; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.apache.eagle.jpm.util.Constants; + +@Table("eglesprk_executors") +@ColumnFamily("f") +@Prefix("sprkexcutr") +@Service(Constants.SPARK_EXECUTOR_SERVICE_ENDPOINT_NAME) +@JsonIgnoreProperties(ignoreUnknown = true) +@TimeSeries(true) +@Tags({"site","sprkAppId", "sprkAppAttemptId", "sprkAppName", "normSprkAppName", "executorId","user", "queue"}) +@Partition({"site"}) +public class SparkExecutor extends TaggedLogAPIEntity{ + + @Column("a") + private String hostPort; + @Column("b") + private int rddBlocks; + @Column("c") + private long memoryUsed; + @Column("d") + private long diskUsed; + @Column("e") + private int activeTasks = 0; + @Column("f") + private int failedTasks = 0; + @Column("g") + private int completedTasks = 0; + @Column("h") + private int totalTasks = 0; + @Column("i") + private long totalDuration = 0; + @Column("j") + private long totalInputBytes = 0; + @Column("k") + private long totalShuffleRead = 0; + @Column("l") + private long totalShuffleWrite = 0; + @Column("m") + private long maxMemory; + @Column("n") + private long startTime; + @Column("o") + private long endTime = 0; + @Column("p") + private long execMemoryBytes; + @Column("q") + private int cores; + @Column("r") + private long memoryOverhead; + + public String getHostPort() { + return hostPort; + } + + public void setHostPort(String hostPort) { + this.hostPort = hostPort; + this.valueChanged("hostPort"); + } + + public int getRddBlocks() { + return rddBlocks; + } + + public void setRddBlocks(int rddBlocks) { + this.rddBlocks = rddBlocks; + this.valueChanged("rddBlocks"); + } + + public long getMemoryUsed() { + return memoryUsed; + } + + public void setMemoryUsed(long memoryUsed) { + this.memoryUsed = memoryUsed; + this.valueChanged("memoryUsed"); + } + + public long getDiskUsed() { + return diskUsed; + } + + public void setDiskUsed(long diskUsed) { + this.diskUsed = diskUsed; + this.valueChanged("diskUsed"); + } + + public int getActiveTasks() { + return activeTasks; + } + + public void setActiveTasks(int activeTasks) { + this.activeTasks = activeTasks; + this.valueChanged("activeTasks"); + } + + public int getFailedTasks() { + return failedTasks; + } + + public void setFailedTasks(int failedTasks) { + this.failedTasks = failedTasks; + this.valueChanged("failedTasks"); + } + + public int getCompletedTasks() { + return completedTasks; + } + + public void setCompletedTasks(int completedTasks) { + this.completedTasks = completedTasks; + this.valueChanged("completedTasks"); + } + + public int getTotalTasks() { + return totalTasks; + } + + public void setTotalTasks(int totalTasks) { + this.totalTasks = totalTasks; + this.valueChanged("totalTasks"); + } + + public long getTotalDuration() { + return totalDuration; + } + + public void setTotalDuration(long totalDuration) { + this.totalDuration = totalDuration; + this.valueChanged("totalDuration"); + } + + public long getTotalInputBytes() { + return totalInputBytes; + } + + public void setTotalInputBytes(long totalInputBytes) { + this.totalInputBytes = totalInputBytes; + this.valueChanged("totalInputBytes"); + } + + public long getTotalShuffleRead() { + return totalShuffleRead; + } + + public void setTotalShuffleRead(long totalShuffleRead) { + this.totalShuffleRead = totalShuffleRead; + this.valueChanged("totalShuffleRead"); + } + + public long getTotalShuffleWrite() { + return totalShuffleWrite; + } + + public void setTotalShuffleWrite(long totalShuffleWrite) { + this.totalShuffleWrite = totalShuffleWrite; + this.valueChanged("totalShuffleWrite"); + } + + public long getMaxMemory() { + return maxMemory; + } + + public void setMaxMemory(long maxMemory) { + this.maxMemory = maxMemory; + this.valueChanged("maxMemory"); + } + + public long getStartTime() { + return startTime; + } + + public void setStartTime(long startTime) { + this.startTime = startTime; + valueChanged("startTime"); + } + + public long getEndTime() { + return endTime; + } + + public void setEndTime(long endTime) { + this.endTime = endTime; + this.valueChanged("endTime"); + } + + public long getExecMemoryBytes() { + return execMemoryBytes; + } + + public void setExecMemoryBytes(long execMemoryBytes) { + this.execMemoryBytes = execMemoryBytes; + this.valueChanged("execMemoryBytes"); + } + + public int getCores() { + return cores; + } + + public void setCores(int cores) { + this.cores = cores; + valueChanged("cores"); + } + + public long getMemoryOverhead() { + return memoryOverhead; + } + + public void setMemoryOverhead(long memoryOverhead) { + this.memoryOverhead = memoryOverhead; + valueChanged("memoryOverhead"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkJob.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkJob.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkJob.java new file mode 100644 index 0000000..a641440 --- /dev/null +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkJob.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.entity; + +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.log.entity.meta.*; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.apache.eagle.jpm.util.Constants; + +@Table("eglesprk_jobs") +@ColumnFamily("f") +@Prefix("sprkjob") +@Service(Constants.SPARK_JOB_SERVICE_ENDPOINT_NAME) +@JsonIgnoreProperties(ignoreUnknown = true) +@TimeSeries(true) +@Tags({"site","sprkAppId", "sprkAppAttemptId", "sprkAppName", "normSprkAppName", "jobId","user", "queue"}) +@Partition({"site"}) +public class SparkJob extends TaggedLogAPIEntity{ + + @Column("a") + private long submissionTime; + @Column("b") + private long completionTime; + @Column("c") + private int numStages=0; + @Column("d") + private String status; + @Column("e") + private int numTask=0; + @Column("f") + private int numActiveTasks=0; + @Column("g") + private int numCompletedTasks=0; + @Column("h") + private int numSkippedTasks=0; + @Column("i") + private int numFailedTasks=0; + @Column("j") + private int numActiveStages=0; + @Column("k") + private int numCompletedStages=0; + @Column("l") + private int numSkippedStages=0; + @Column("m") + private int numFailedStages=0; + + public long getSubmissionTime() { + return submissionTime; + } + + public long getCompletionTime() { + return completionTime; + } + + public int getNumStages() { + return numStages; + } + + public String getStatus() { + return status; + } + + public int getNumTask() { + return numTask; + } + + public int getNumActiveTasks() { + return numActiveTasks; + } + + public int getNumCompletedTasks() { + return numCompletedTasks; + } + + public int getNumSkippedTasks() { + return numSkippedTasks; + } + + public int getNumFailedTasks() { + return numFailedTasks; + } + + public int getNumActiveStages() { + return numActiveStages; + } + + public int getNumCompletedStages() { + return numCompletedStages; + } + + public int getNumSkippedStages() { + return numSkippedStages; + } + + public int getNumFailedStages() { + return numFailedStages; + } + + public void setSubmissionTime(long submissionTime) { + this.submissionTime = submissionTime; + this.valueChanged("submissionTime"); + } + + public void setCompletionTime(long completionTime) { + this.completionTime = completionTime; + this.valueChanged("completionTime"); + } + + public void setNumStages(int numStages) { + this.numStages = numStages; + this.valueChanged("numStages"); + } + + public void setStatus(String status) { + this.status = status; + this.valueChanged("status"); + } + + public void setNumTask(int numTask) { + this.numTask = numTask; + this.valueChanged("numTask"); + } + + public void setNumActiveTasks(int numActiveTasks) { + this.numActiveTasks = numActiveTasks; + this.valueChanged("numActiveTasks"); + } + + public void setNumCompletedTasks(int numCompletedTasks) { + this.numCompletedTasks = numCompletedTasks; + this.valueChanged("numCompletedTasks"); + } + + public void setNumSkippedTasks(int numSkippedTasks) { + this.numSkippedTasks = numSkippedTasks; + this.valueChanged("numSkippedTasks"); + } + + public void setNumFailedTasks(int numFailedTasks) { + this.numFailedTasks = numFailedTasks; + this.valueChanged("numFailedTasks"); + } + + public void setNumActiveStages(int numActiveStages) { + this.numActiveStages = numActiveStages; + this.valueChanged("numActiveStages"); + } + + public void setNumCompletedStages(int numCompletedStages) { + this.numCompletedStages = numCompletedStages; + this.valueChanged("numCompletedStages"); + } + + public void setNumSkippedStages(int numSkippedStages) { + this.numSkippedStages = numSkippedStages; + this.valueChanged("numSkippedStages"); + } + + public void setNumFailedStages(int numFailedStages) { + this.numFailedStages = numFailedStages; + this.valueChanged("numFailedStages"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkStage.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkStage.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkStage.java new file mode 100644 index 0000000..92714bf --- /dev/null +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkStage.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.entity; + +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.log.entity.meta.*; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.apache.eagle.jpm.util.Constants; + +@Table("eglesprk_stages") +@ColumnFamily("f") +@Prefix("sprkstage") +@Service(Constants.SPARK_STAGE_SERVICE_ENDPOINT_NAME) +@JsonIgnoreProperties(ignoreUnknown = true) +@TimeSeries(true) +@Tags({"site","sprkAppId", "sprkAppAttemptId", "sprkAppName", "normSprkAppName", "jobId", "stageId","stageAttemptId","user", "queue"}) +@Partition({"site"}) +public class SparkStage extends TaggedLogAPIEntity{ + + @Column("a") + private String status; + @Column("b") + private int numActiveTasks=0; + @Column("c") + private int numCompletedTasks=0; + @Column("d") + private int numFailedTasks=0; + @Column("e") + private long executorRunTime=0l; + @Column("f") + private long inputBytes=0l; + @Column("g") + private long inputRecords=0l; + @Column("h") + private long outputBytes=0l; + @Column("i") + private long outputRecords=0l; + @Column("j") + private long shuffleReadBytes=0l; + @Column("k") + private long shuffleReadRecords=0l; + @Column("l") + private long shuffleWriteBytes=0l; + @Column("m") + private long shuffleWriteRecords=0l; + @Column("n") + private long memoryBytesSpilled=0l; + @Column("o") + private long diskBytesSpilled=0l; + @Column("p") + private String name; + @Column("q") + private String schedulingPool; + @Column("r") + private long submitTime; + @Column("s") + private long completeTime; + @Column("t") + private int numTasks; + @Column("u") + private long executorDeserializeTime; + @Column("v") + private long resultSize; + @Column("w") + private long jvmGcTime; + @Column("x") + private long resultSerializationTime; + + public String getStatus() { + return status; + } + + public int getNumActiveTasks() { + return numActiveTasks; + } + + public int getNumCompletedTasks() { + return numCompletedTasks; + } + + public int getNumFailedTasks() { + return numFailedTasks; + } + + public long getExecutorRunTime() { + return executorRunTime; + } + + public long getInputBytes() { + return inputBytes; + } + + public long getInputRecords() { + return inputRecords; + } + + public long getOutputBytes() { + return outputBytes; + } + + public long getOutputRecords() { + return outputRecords; + } + + public long getShuffleReadBytes() { + return shuffleReadBytes; + } + + public long getShuffleReadRecords() { + return shuffleReadRecords; + } + + public long getShuffleWriteBytes() { + return shuffleWriteBytes; + } + + public long getShuffleWriteRecords() { + return shuffleWriteRecords; + } + + public long getMemoryBytesSpilled() { + return memoryBytesSpilled; + } + + public long getDiskBytesSpilled() { + return diskBytesSpilled; + } + + public String getName() { + return name; + } + + public String getSchedulingPool() { + return schedulingPool; + } + + public long getSubmitTime() { + return submitTime; + } + + public long getCompleteTime() { + return completeTime; + } + + public int getNumTasks() { + return numTasks; + } + + public long getExecutorDeserializeTime() { + return executorDeserializeTime; + } + + public long getResultSize() { + return resultSize; + } + + public long getJvmGcTime() { + return jvmGcTime; + } + + public long getResultSerializationTime() { + return resultSerializationTime; + } + + public void setStatus(String status) { + this.status = status; + this.valueChanged("status"); + } + + public void setNumActiveTasks(int numActiveTasks) { + this.numActiveTasks = numActiveTasks; + this.valueChanged("numActiveTasks"); + } + + public void setNumCompletedTasks(int numCompletedTasks) { + this.numCompletedTasks = numCompletedTasks; + this.valueChanged("numCompletedTasks"); + } + + public void setNumFailedTasks(int numFailedTasks) { + this.numFailedTasks = numFailedTasks; + this.valueChanged("numFailedTasks"); + } + + public void setExecutorRunTime(long executorRunTime) { + this.executorRunTime = executorRunTime; + this.valueChanged("executorRunTime"); + } + + public void setInputBytes(long inputBytes) { + this.inputBytes = inputBytes; + this.valueChanged("inputBytes"); + } + + public void setInputRecords(long inputRecords) { + this.inputRecords = inputRecords; + this.valueChanged("inputRecords"); + } + + public void setOutputBytes(long outputBytes) { + this.outputBytes = outputBytes; + this.valueChanged("outputBytes"); + } + + public void setOutputRecords(long outputRecords) { + this.outputRecords = outputRecords; + this.valueChanged("outputRecords"); + } + + public void setShuffleReadBytes(long shuffleReadBytes) { + this.shuffleReadBytes = shuffleReadBytes; + this.valueChanged("shuffleReadBytes"); + } + + public void setShuffleReadRecords(long shuffleReadRecords) { + this.shuffleReadRecords = shuffleReadRecords; + this.valueChanged("shuffleReadRecords"); + } + + public void setShuffleWriteBytes(long shuffleWriteBytes) { + this.shuffleWriteBytes = shuffleWriteBytes; + this.valueChanged("shuffleWriteBytes"); + } + + public void setShuffleWriteRecords(long shuffleWriteRecords) { + this.shuffleWriteRecords = shuffleWriteRecords; + this.valueChanged("shuffleWriteRecords"); + } + + public void setMemoryBytesSpilled(long memoryBytesSpilled) { + this.memoryBytesSpilled = memoryBytesSpilled; + this.valueChanged("memoryBytesSpilled"); + } + + public void setDiskBytesSpilled(long diskBytesSpilled) { + this.diskBytesSpilled = diskBytesSpilled; + this.valueChanged("diskBytesSpilled"); + } + + public void setName(String name) { + this.name = name; + this.valueChanged("name"); + } + + public void setSchedulingPool(String schedulingPool) { + this.schedulingPool = schedulingPool; + this.valueChanged("schedulingPool"); + } + + public void setSubmitTime(long submitTime) { + this.submitTime = submitTime; + this.valueChanged("submitTime"); + } + + public void setCompleteTime(long completeTime) { + this.completeTime = completeTime; + this.valueChanged("completeTime"); + } + + public void setNumTasks(int numTasks) { + this.numTasks = numTasks; + valueChanged("numTasks"); + } + + public void setExecutorDeserializeTime(long executorDeserializeTime) { + this.executorDeserializeTime = executorDeserializeTime; + valueChanged("executorDeserializeTime"); + } + + public void setResultSize(long resultSize) { + this.resultSize = resultSize; + valueChanged("resultSize"); + } + + public void setJvmGcTime(long jvmGcTime) { + this.jvmGcTime = jvmGcTime; + valueChanged("jvmGcTime"); + } + + public void setResultSerializationTime(long resultSerializationTime) { + this.resultSerializationTime = resultSerializationTime; + valueChanged("resultSerializationTime"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkTask.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkTask.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkTask.java new file mode 100644 index 0000000..af9ed21 --- /dev/null +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkTask.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.entity; + +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.log.entity.meta.*; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.apache.eagle.jpm.util.Constants; + +@Table("eglesprk_tasks") +@ColumnFamily("f") +@Prefix("sprktask") +@Service(Constants.SPARK_TASK_SERVICE_ENDPOINT_NAME) +@JsonIgnoreProperties(ignoreUnknown = true) +@TimeSeries(true) +@Tags({"site","sprkAppId", "sprkAppAttemptId", "sprkAppName", "normSprkAppName", "jobId", "jobName", "stageId","stageAttemptId","taskIndex","taskAttemptId","user", "queue"}) +@Partition({"site"}) +public class SparkTask extends TaggedLogAPIEntity{ + + @Column("a") + private int taskId; + @Column("b") + private long launchTime; + @Column("c") + private String executorId; + @Column("d") + private String host; + @Column("e") + private String taskLocality; + @Column("f") + private boolean speculative; + @Column("g") + private long executorDeserializeTime; + @Column("h") + private long executorRunTime; + @Column("i") + private long resultSize; + @Column("j") + private long jvmGcTime; + @Column("k") + private long resultSerializationTime; + @Column("l") + private long memoryBytesSpilled; + @Column("m") + private long diskBytesSpilled; + @Column("n") + private long inputBytes; + @Column("o") + private long inputRecords; + @Column("p") + private long outputBytes; + @Column("q") + private long outputRecords; + @Column("r") + private long shuffleReadRemoteBytes; + @Column("x") + private long shuffleReadLocalBytes; + @Column("s") + private long shuffleReadRecords; + @Column("t") + private long shuffleWriteBytes; + @Column("u") + private long shuffleWriteRecords; + @Column("v") + private boolean failed; + + public int getTaskId() { + return taskId; + } + + public long getLaunchTime() { + return launchTime; + } + + public String getExecutorId() { + return executorId; + } + + public String getHost() { + return host; + } + + public String getTaskLocality() { + return taskLocality; + } + + public boolean isSpeculative() { + return speculative; + } + + public long getExecutorDeserializeTime() { + return executorDeserializeTime; + } + + public long getExecutorRunTime() { + return executorRunTime; + } + + public long getResultSize() { + return resultSize; + } + + public long getJvmGcTime() { + return jvmGcTime; + } + + public long getResultSerializationTime() { + return resultSerializationTime; + } + + public long getMemoryBytesSpilled() { + return memoryBytesSpilled; + } + + public long getDiskBytesSpilled() { + return diskBytesSpilled; + } + + public long getInputBytes() { + return inputBytes; + } + + public long getInputRecords() { + return inputRecords; + } + + public long getOutputBytes() { + return outputBytes; + } + + public long getOutputRecords() { + return outputRecords; + } + + public long getShuffleReadRecords() { + return shuffleReadRecords; + } + + public long getShuffleWriteBytes() { + return shuffleWriteBytes; + } + + public long getShuffleWriteRecords() { + return shuffleWriteRecords; + } + + public boolean isFailed() { + return failed; + } + + public long getShuffleReadRemoteBytes() { + return shuffleReadRemoteBytes; + } + + public long getShuffleReadLocalBytes() { + return shuffleReadLocalBytes; + } + + public void setFailed(boolean failed) { + this.failed = failed; + valueChanged("failed"); + } + + public void setTaskId(int taskId) { + this.taskId = taskId; + valueChanged("taskId"); + } + + public void setLaunchTime(long launchTime) { + this.launchTime = launchTime; + valueChanged("launchTime"); + } + + public void setExecutorId(String executorId) { + this.executorId = executorId; + valueChanged("executorId"); + } + + public void setHost(String host) { + this.host = host; + this.valueChanged("host"); + } + + public void setTaskLocality(String taskLocality) { + this.taskLocality = taskLocality; + this.valueChanged("taskLocality"); + } + + public void setSpeculative(boolean speculative) { + this.speculative = speculative; + this.valueChanged("speculative"); + } + + public void setExecutorDeserializeTime(long executorDeserializeTime) { + this.executorDeserializeTime = executorDeserializeTime; + this.valueChanged("executorDeserializeTime"); + } + + public void setExecutorRunTime(long executorRunTime) { + this.executorRunTime = executorRunTime; + this.valueChanged("executorRunTime"); + } + + public void setResultSize(long resultSize) { + this.resultSize = resultSize; + this.valueChanged("resultSize"); + } + + public void setJvmGcTime(long jvmGcTime) { + this.jvmGcTime = jvmGcTime; + this.valueChanged("jvmGcTime"); + } + + public void setResultSerializationTime(long resultSerializationTime) { + this.resultSerializationTime = resultSerializationTime; + this.valueChanged("resultSerializationTime"); + } + + public void setMemoryBytesSpilled(long memoryBytesSpilled) { + this.memoryBytesSpilled = memoryBytesSpilled; + this.valueChanged("memoryBytesSpilled"); + } + + public void setDiskBytesSpilled(long diskBytesSpilled) { + this.diskBytesSpilled = diskBytesSpilled; + this.valueChanged("diskBytesSpilled"); + } + + public void setInputBytes(long inputBytes) { + this.inputBytes = inputBytes; + this.valueChanged("inputBytes"); + } + + public void setInputRecords(long inputRecords) { + this.inputRecords = inputRecords; + this.valueChanged("inputRecords"); + } + + public void setOutputBytes(long outputBytes) { + this.outputBytes = outputBytes; + this.valueChanged("outputBytes"); + } + + public void setOutputRecords(long outputRecords) { + this.outputRecords = outputRecords; + this.valueChanged("outputRecords"); + } + + + + public void setShuffleReadRecords(long shuffleReadRecords) { + this.shuffleReadRecords = shuffleReadRecords; + this.valueChanged("shuffleReadRecords"); + } + + public void setShuffleWriteBytes(long shuffleWriteBytes) { + this.shuffleWriteBytes = shuffleWriteBytes; + this.valueChanged("shuffleWriteBytes"); + } + + public void setShuffleWriteRecords(long shuffleWriteRecords) { + this.shuffleWriteRecords = shuffleWriteRecords; + this.valueChanged("shuffleWriteRecords"); + } + + public void setShuffleReadRemoteBytes(long shuffleReadRemoteBytes) { + this.shuffleReadRemoteBytes = shuffleReadRemoteBytes; + this.valueChanged("shuffleReadRemoteBytes"); + } + + public void setShuffleReadLocalBytes(long shuffleReadLocalBytes) { + this.shuffleReadLocalBytes = shuffleReadLocalBytes; + this.valueChanged("shuffleReadLocalBytes"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/pom.xml b/eagle-jpm/eagle-jpm-mr-history/pom.xml new file mode 100644 index 0000000..97be7ec --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/pom.xml @@ -0,0 +1,138 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>eagle-jpm-parent</artifactId> + <groupId>org.apache.eagle</groupId> + <version>0.5.0-incubating-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>eagle-jpm-mr-history</artifactId> + <name>eagle-jpm-mr-history</name> + <url>http://maven.apache.org</url> + <dependencies> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-jpm-util</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-stream-process-api</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.wso2.orbit.com.lmax</groupId> + <artifactId>disruptor</artifactId> + </exclusion> + <exclusion> + <groupId>asm</groupId> + <artifactId>asm</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-stream-process-base</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.wso2.orbit.com.lmax</groupId> + <artifactId>disruptor</artifactId> + </exclusion> + <exclusion> + <groupId>asm</groupId> + <artifactId>asm</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-recipes</artifactId> + <version>${curator.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-annotations</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-app</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.version}</version> + </dependency> + </dependencies> + + <build> + <resources> + <resource> + <directory>src/main/resources</directory> + </resource> + </resources> + <plugins> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <descriptor>src/assembly/eagle-jpm-mr-history-assembly.xml</descriptor> + <finalName>eagle-jpm-mr-history-${project.version}</finalName> + </configuration> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + <configuration> + <tarLongFileMode>posix</tarLongFileMode> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/assembly/eagle-jpm-mr-history-assembly.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/assembly/eagle-jpm-mr-history-assembly.xml b/eagle-jpm/eagle-jpm-mr-history/src/assembly/eagle-jpm-mr-history-assembly.xml new file mode 100644 index 0000000..cf6d108 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/assembly/eagle-jpm-mr-history-assembly.xml @@ -0,0 +1,65 @@ +<?xml version="1.0"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <id>assembly</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <dependencySets> + <dependencySet> + <outputDirectory>/</outputDirectory> + <useProjectArtifact>false</useProjectArtifact> + <unpack>true</unpack> + <scope>runtime</scope> + <!--includes> + <include>org.apache.hadoop:hadoop-common</include> + <include>org.apache.hadoop:hadoop-hdfs</include> + <include>org.apache.hadoop:hadoop-client</include> + <include>org.apache.hadoop:hadoop-auth</include> + <include>org.apache.eagle:eagle-stream-process-api</include> + <include>org.apache.eagle:eagle-stream-process-base</include> + <include>org.jsoup:jsoup</include> + </includes--> + <excludes> + <exclude>org.wso2.orbit.com.lmax:disruptor</exclude> + <exclude>asm:asm</exclude> + <exclude>org.apache.storm:storm-core</exclude> + </excludes> + </dependencySet> + </dependencySets> + <fileSets> + <fileSet> + <directory>${project.build.outputDirectory}/</directory> + <outputDirectory>/</outputDirectory> + <!--<includes>--> + <!--<include>*.conf</include>--> + <!--<include>*.xml</include>--> + <!--<include>*.properties</include>--> + <!--<include>*.config</include>--> + <!--<include>classes/META-INF/*</include>--> + <!--</includes>--> + + <excludes> + <exclude>*.yaml</exclude> + </excludes> + </fileSet> + </fileSets> +</assembly> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java new file mode 100644 index 0000000..7c0530d --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history; + +import backtype.storm.Config; +import backtype.storm.LocalCluster; +import backtype.storm.StormSubmitter; +import backtype.storm.topology.TopologyBuilder; +import org.apache.eagle.jpm.mr.history.common.JHFConfigManager; +import org.apache.eagle.jpm.mr.history.common.JPAConstants; +import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter; +import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilterBuilder; +import org.apache.eagle.jpm.mr.history.storm.HistoryJobProgressBolt; +import org.apache.eagle.jpm.mr.history.storm.JobHistorySpout; + +import java.util.List; +import java.util.regex.Pattern; + +public class MRHistoryJobMain { + public static void main(String []args) { + try { + //1. trigger init conf + JHFConfigManager jhfConfigManager = JHFConfigManager.getInstance(args); + + //2. init JobHistoryContentFilter + JobHistoryContentFilterBuilder builder = JobHistoryContentFilterBuilder.newBuilder().acceptJobFile().acceptJobConfFile(); + List<String> confKeyPatterns = jhfConfigManager.getConfig().getStringList("MRConfigureKeys"); + confKeyPatterns.add(JPAConstants.JobConfiguration.CASCADING_JOB); + confKeyPatterns.add(JPAConstants.JobConfiguration.HIVE_JOB); + confKeyPatterns.add(JPAConstants.JobConfiguration.PIG_JOB); + confKeyPatterns.add(JPAConstants.JobConfiguration.SCOOBI_JOB); + + for (String key : confKeyPatterns) { + builder.includeJobKeyPatterns(Pattern.compile(key)); + } + JobHistoryContentFilter filter = builder.build(); + + //3. init topology + TopologyBuilder topologyBuilder = new TopologyBuilder(); + String topologyName = "mrHistoryJobTopology"; + String spoutName = "mrHistoryJobExecutor"; + String boltName = "updateProcessTime"; + int parallelism = jhfConfigManager.getConfig().getInt("envContextConfig.parallelismConfig." + spoutName); + int tasks = jhfConfigManager.getConfig().getInt("envContextConfig.tasks." + spoutName); + if (parallelism > tasks) { + parallelism = tasks; + } + topologyBuilder.setSpout( + spoutName, + new JobHistorySpout(filter, jhfConfigManager), + parallelism + ).setNumTasks(tasks); + topologyBuilder.setBolt(boltName, new HistoryJobProgressBolt(spoutName, jhfConfigManager), 1).setNumTasks(1).allGrouping(spoutName); + + backtype.storm.Config config = new backtype.storm.Config(); + config.setNumWorkers(jhfConfigManager.getConfig().getInt("envContextConfig.workers")); + config.put(Config.TOPOLOGY_DEBUG, true); + if (!jhfConfigManager.getEnv().equals("local")) { + //cluster mode + //parse conf here + StormSubmitter.submitTopology(topologyName, config, topologyBuilder.createTopology()); + } else { + //local mode + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology(topologyName, config, topologyBuilder.createTopology()); + } + } catch (Exception e) { + e.printStackTrace(); + } + } +}
