http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java new file mode 100644 index 0000000..4e6bf03 --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java @@ -0,0 +1,178 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.eagle.jpm.spark.history.storm; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Tuple; +import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig; +import org.apache.eagle.jpm.spark.history.crawl.JHFInputStreamReader; +import org.apache.eagle.jpm.spark.history.crawl.SparkApplicationInfo; +import org.apache.eagle.jpm.spark.history.crawl.SparkHistoryFileInputStreamReaderImpl; +import org.apache.eagle.jpm.spark.history.status.JobHistoryZKStateManager; +import org.apache.eagle.jpm.spark.history.status.ZKStateConstant; +import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.jpm.util.HDFSUtil; +import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher; +import org.apache.eagle.jpm.util.resourceFetch.SparkHistoryServerResourceFetcher; +import org.apache.eagle.jpm.util.resourceFetch.model.SparkApplication; +import org.apache.eagle.jpm.util.resourceFetch.model.SparkApplicationAttempt; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class SparkJobParseBolt extends BaseRichBolt { + + private static final Logger LOG = LoggerFactory.getLogger(SparkJobParseBolt.class); + + private OutputCollector collector; + private ResourceFetcher historyServerFetcher; + private SparkHistoryCrawlConfig config; + private JobHistoryZKStateManager zkState; + private Configuration hdfsConf; + + public SparkJobParseBolt(SparkHistoryCrawlConfig config) { + this.config = config; + } + + @Override + public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { + this.collector = outputCollector; + this.hdfsConf = new Configuration(); + this.hdfsConf.set("fs.defaultFS", config.hdfsConfig.endpoint); + this.hdfsConf.setBoolean("fs.hdfs.impl.disable.cache", true); + this.hdfsConf.set("hdfs.kerberos.principal", config.hdfsConfig.principal); + this.hdfsConf.set("hdfs.keytab.file", config.hdfsConfig.keytab); + this.historyServerFetcher = new SparkHistoryServerResourceFetcher(config.jobHistoryConfig.historyServerUrl, + config.jobHistoryConfig.historyServerUserName, config.jobHistoryConfig.historyServerUserPwd); + this.zkState = new JobHistoryZKStateManager(config); + } + + @Override + public void execute(Tuple tuple) { + String appId = tuple.getStringByField("appId"); + FileSystem hdfs = null; + try { + if (!zkState.hasApplication(appId)) { + //may already be processed due to some reason + collector.ack(tuple); + return; + } + + SparkApplicationInfo info = zkState.getApplicationInfo(appId); + //first try to get attempts under the application + List<SparkApplicationAttempt> attempts = this.getAttemptList(appId); + + if (attempts.isEmpty()) { + LOG.info("Application:{}( Name:{}, user: {}, queue: {}) not found on history server.", appId, info.getName(), info.getUser(), info.getQueue()); + } else { + hdfs = HDFSUtil.getFileSystem(this.hdfsConf); + for (SparkApplicationAttempt attempt : attempts) { + Path attemptFile = new Path(this.config.hdfsConfig.baseDir + "/" + this.getAppAttemptLogName(appId, attempt.getAttemptId())); + JHFInputStreamReader reader = new SparkHistoryFileInputStreamReaderImpl(config.info.site , info); + reader.read(hdfs.open(attemptFile)); + } + } + + zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FINISHED); + LOG.info("Successfully parse application {}", appId); + collector.ack(tuple); + } catch (Exception e) { + LOG.error("Fail to process application {}", appId, e); + zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FAILED); + collector.fail(tuple); + } finally { + if (null != hdfs) { + try { + hdfs.close(); + } catch (Exception e) { + LOG.error("Fail to close hdfs"); + } + } + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + + } + + private String getAppAttemptLogName(String appId, String attemptId) { + return String.format("%s_%s", appId, attemptId); + } + + private List<SparkApplicationAttempt> getAttemptList(String appId) throws IOException { + FileSystem hdfs = null; + List<SparkApplicationAttempt> attempts = new ArrayList<>(); + try { + + SparkApplication app = null; + /*try { + List apps = this.historyServerFetcher.getResource(Constants.ResourceType.SPARK_JOB_DETAIL, appId); + if (apps != null) { + app = (SparkApplication) apps.get(0); + attempts = app.getAttempts(); + } + } catch (Exception e) { + LOG.warn("Fail to get application detail from history server for appId " + appId, e); + }*/ + + + if (null == app) { + //history server may not have the info, just double check + hdfs = HDFSUtil.getFileSystem(this.hdfsConf); + Integer attemptId = 1; + + boolean exists = true; + while (exists) { + Path attemptFile = new Path(this.config.hdfsConfig.baseDir + "/" + this.getAppAttemptLogName(appId, attemptId.toString())); + if (hdfs.exists(attemptFile)) { + SparkApplicationAttempt attempt = new SparkApplicationAttempt(); + attempt.setAttemptId(attemptId.toString()); + attempts.add(attempt); + attemptId++; + } else { + exists = false; + } + } + } + return attempts; + } finally { + if (null != hdfs) { + hdfs.close(); + } + } + } + + @Override + public void cleanup() { + super.cleanup(); + zkState.close(); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java new file mode 100644 index 0000000..eb30f5e --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java @@ -0,0 +1,47 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.eagle.jpm.spark.history.storm; + +import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig; +import org.apache.eagle.jpm.util.HDFSUtil; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestHDFS { + + private static final Logger LOG = LoggerFactory.getLogger(TestHDFS.class); + public static void main(String[] args) throws Exception{ + SparkHistoryCrawlConfig config = new SparkHistoryCrawlConfig(); + + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", config.hdfsConfig.endpoint); + conf.set("hdfs.kerberos.principal", config.hdfsConfig.principal); + conf.set("hdfs.keytab.file", config.hdfsConfig.keytab); + + FileSystem hdfs = HDFSUtil.getFileSystem(conf); + Path path = new Path("/logs/spark-events/local-1463002514438"); + boolean exists = hdfs.exists(path); + LOG.info("File exists:{}", exists); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem new file mode 100644 index 0000000..21686a6 --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.hadoop.hdfs.DistributedFileSystem +org.apache.hadoop.hdfs.web.HftpFileSystem +org.apache.hadoop.hdfs.web.HsftpFileSystem +org.apache.hadoop.hdfs.web.WebHdfsFileSystem +org.apache.hadoop.hdfs.web.SWebHdfsFileSystem \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf new file mode 100644 index 0000000..36f0836 --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf @@ -0,0 +1,77 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +{ + "basic":{ + "cluster":"sandbox", + "datacenter":"sandbox", + jobConf.additional.info: [] + }, + "eagleProps":{ + eagle.service.host:"sandbox.hortonworks.com", + eagle.service.port: 9099, + eagle.service.userName: "admin", + eagle.service.pwd : "secret", + eagle.service.read_timeout : 2 + }, + "dataSourceConfig":{ + "zkQuorum" : "sandbox.hortonworks.com:2181", + "zkRoot" : "/sparkJobHistory", + "zkSessionTimeoutMs" : 15000, + "zkRetryTimes" : 3, + "zkRetryInterval" : 20000, + spark.history.server.url : "http://sandbox.hortonworks.com:18080/", + spark.history.server.username : "", + spark.history.server.pwd : "", + rm.url:["http://sandbox.hortonworks.com:8088"] , + "hdfs": { + "baseDir": "/logs/spark-events", + "endPoint": "hdfs://sandbox.hortonworks.com:8020", + "principal": "", + "keytab" : "" + } + }, + "storm":{ + "mode": "local", + "workerNo": 2, + "name":"sparkHistory", + "messageTimeoutSec": 3000, + "pendingSpout": 1000, + "spoutCrawlInterval": 10000,#in ms + "parallelismConfig" : { + "sparkHistoryJobSpout" : 1, + "sparkHistoryJobBolt" : 6 + }, + "tasks" : { + "sparkHistoryJobSpout" : 1, + "sparkHistoryJobBolt" : 6 + } + }, + "spark":{ + "defaultVal":{ + spark.executor.memory:"1g", + spark.driver.memory: "1g", + spark.driver.cores:1, + spark.executor.cores:1, + spark.yarn.am.memory:"512m", + spark.yarn.am.cores:1, + spark.yarn.executor.memoryOverhead.factor: 10, + spark.yarn.driver.memoryOverhead.factor: 10, + spark.yarn.am.memoryOverhead.factor: 10, + spark.yarn.overhead.min: "384m" + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/log4j.properties b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/log4j.properties new file mode 100644 index 0000000..6b8c8d6 --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/log4j.properties @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, stdout, DRFA + +eagle.log.dir=../logs +eagle.log.file=eagle.log + +# standard output +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n + +# Daily Rolling File Appender + log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender + log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file} + log4j.appender.DRFA.DatePattern=.yyyy-MM-dd +## 30-day backup +# log4j.appender.DRFA.MaxBackupIndex=30 + log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout + +# Pattern format: Date LogLevel LoggerName LogMessage +log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-running/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/pom.xml b/eagle-jpm/eagle-jpm-spark-running/pom.xml new file mode 100644 index 0000000..7bf90d4 --- /dev/null +++ b/eagle-jpm/eagle-jpm-spark-running/pom.xml @@ -0,0 +1,66 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-jpm-parent</artifactId> + <version>0.5.0-incubating-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + <artifactId>eagle-jpm-spark-running</artifactId> + <name>eagle-jpm-spark-running</name> + <url>http://maven.apache.org</url> + <dependencies> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-stream-process-api</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-stream-process-base</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-job-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.jsoup</groupId> + <artifactId>jsoup</artifactId> + </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <exclusions> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/pom.xml b/eagle-jpm/eagle-jpm-util/pom.xml new file mode 100644 index 0000000..ed7f658 --- /dev/null +++ b/eagle-jpm/eagle-jpm-util/pom.xml @@ -0,0 +1,65 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-jpm-parent</artifactId> + <version>0.5.0-incubating-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + <artifactId>eagle-jpm-util</artifactId> + <name>eagle-jpm-util</name> + <url>http://maven.apache.org</url> + + <dependencies> + <dependency> + <groupId>com.googlecode.json-simple</groupId> + <artifactId>json-simple</artifactId> + <version>1.1.1</version> + </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <version>${storm.version}</version> + <exclusions> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>log4j-over-slf4j</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + <version>1.9</version> + </dependency> + + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java new file mode 100644 index 0000000..0792f15 --- /dev/null +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.util; + +public class Constants { + + public final static String SPARK_APP_SERVICE_ENDPOINT_NAME = "SparkAppService"; + public final static String SPARK_JOB_SERVICE_ENDPOINT_NAME = "SparkJobService"; + public final static String SPARK_STAGE_SERVICE_ENDPOINT_NAME = "SparkStageService"; + public final static String SPARK_TASK_SERVICE_ENDPOINT_NAME = "SparkTaskService"; + public final static String SPARK_EXECUTOR_SERVICE_ENDPOINT_NAME = "SparkExecutorService"; + + public static final String APPLICATION_PREFIX = "application"; + public static final String JOB_PREFIX = "job"; + public static final String V2_APPS_URL = "ws/v1/cluster/apps"; + public static final String ANONYMOUS_PARAMETER = "anonymous=true"; + + public static final String V2_APPS_RUNNING_URL = "ws/v1/cluster/apps?state=RUNNING"; + public static final String V2_APPS_COMPLETED_URL = "ws/v1/cluster/apps?state=FINISHED"; + + public static final String SPARK_APPS_URL ="api/v1/applications"; + + public enum CompressionType { + GZIP, NONE + } + public enum JobState { + RUNNING, COMPLETED, ALL + } + + public enum ResourceType { + COMPLETE_SPARK_JOB, SPARK_JOB_DETAIL + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java new file mode 100644 index 0000000..8adb001 --- /dev/null +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java @@ -0,0 +1,44 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.eagle.jpm.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; + +public class HDFSUtil { + + public static FileSystem getFileSystem(Configuration conf) throws IOException { + HDFSUtil.login(conf); + return FileSystem.get(conf); + } + + public static void login(Configuration kConfig) throws IOException { + if(kConfig.get("hdfs.kerberos.principal") == null || kConfig.get("hdfs.kerberos.principal").isEmpty()){ + return; + } + kConfig.setBoolean("hadoop.security.authorization", true); + kConfig.set("hadoop.security.authentication", "kerberos"); + UserGroupInformation.setConfiguration(kConfig); + UserGroupInformation.loginUserFromKeytab(kConfig.get("hdfs.kerberos.principal"), kConfig.get("hdfs.keytab.file")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtil.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtil.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtil.java new file mode 100644 index 0000000..8080147 --- /dev/null +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtil.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.util; + +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; + +public class JSONUtil { + + public static String getString(JSONObject obj, String field) { + if (obj.containsKey(field)) { + return (String) obj.get(field); + } + return null; + } + + public static Integer getInt(JSONObject obj, String field){ + if(obj.containsKey(field)){ + return ((Long)obj.get(field)).intValue(); + } + return null; + } + + public static Long getLong(JSONObject obj, String field){ + if(obj.containsKey(field)){ + return (Long)obj.get(field); + } + return null; + } + + public static Boolean getBoolean(JSONObject obj, String field){ + if(obj.containsKey(field)){ + return (Boolean)obj.get(field); + } + return null; + } + + public static JSONObject getJSONObject(JSONObject obj, String field){ + if(obj.containsKey(field)){ + return (JSONObject)obj.get(field); + } + return null; + } + + public static JSONArray getJSONArray(JSONObject obj, String field){ + if(obj.containsKey(field)){ + return (JSONArray)obj.get(field); + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobNameNormalization.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobNameNormalization.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobNameNormalization.java new file mode 100644 index 0000000..c5cc82f --- /dev/null +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobNameNormalization.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.util; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class JobNameNormalization { + private static Logger logger = LoggerFactory.getLogger(JobNameNormalization.class); + private static JobNameNormalization instance = new JobNameNormalization(); + private static final String JOB_NAME_NORMALIZATION_RULES_KEY = "job.name.normalization.rules.key"; + private static final String PARAMETERIZED_PREFIX = "\\$"; + private static final String MULTIPLE_RULE_DILIMITER = ";"; + /** + * map from source string to target string + * source string is regular expression, for example ^(.*)[0-9]{4}/[0-9]{2}/[0-9]{2}/[0-9]{2}$ + * target string is parameterized string, for example $1, $2 + */ + private List<JobNameNormalizationRule> _rules = new ArrayList<JobNameNormalizationRule>(); + + private enum NormalizationOp{ + REPLACE("=>"); + private String value; + private NormalizationOp(String value){ + this.value = value; + } + public String toString(){ + return value; + } + } + + static class JobNameNormalizationRule{ + Pattern pattern; + NormalizationOp op; + String target; + } + + private JobNameNormalization(){ + try{ + // load normalization rules + Config conf = ConfigFactory.load(); + String key = JOB_NAME_NORMALIZATION_RULES_KEY.toLowerCase(); + String value = conf.getString(key); + if(value == null){ + logger.info("no job name normalization rules are loaded"); + return; + } + // multiple rules are concatenated with semicolon, i.e. ; + String rules[] = value.split(MULTIPLE_RULE_DILIMITER); + for(String rule : rules){ + rule = rule.trim(); + logger.info("jobNormalizationRule is loaded " + rule); + addRule(rule); + } + }catch(Exception ex){ + logger.error("fail loading job name normalization rules", ex); + throw new RuntimeException(ex); + } + } + + public static JobNameNormalization getInstance(){ + return instance; + } + + private void addRule(String rule){ + for(NormalizationOp op : NormalizationOp.values()){ + // split the rule to be source and target string + String elements[] = rule.split(op.toString()); + if(elements == null || elements.length != 2) return; + JobNameNormalizationRule r = new JobNameNormalizationRule(); + r.pattern = Pattern.compile(elements[0].trim()); + r.op = op; + r.target = elements[1].trim(); + _rules.add(r); + break; //once one Op is matched, exit + } + + } + + public String normalize(String jobName){ + String normalizedJobName = jobName; + // go through each rules and do actions + for(JobNameNormalizationRule rule : _rules){ + Pattern p = rule.pattern; + Matcher m = p.matcher(jobName); + if(m.find()){ + normalizedJobName = rule.target; + int c = m.groupCount(); + for(int i=1; i<c+1; i++){ + normalizedJobName = normalizedJobName.replaceAll(PARAMETERIZED_PREFIX+String.valueOf(i), m.group(i)); + } + } + } + return normalizedJobName; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkEntityConstant.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkEntityConstant.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkEntityConstant.java new file mode 100644 index 0000000..35014b1 --- /dev/null +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkEntityConstant.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.util; + +public class SparkEntityConstant { + + public enum SPARK_STAGE_STATUS{ + COMPLETE, FAILED + } + + public enum SPARK_JOB_STATUS{ + SUCCEEDED, FAILED + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkJobTagName.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkJobTagName.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkJobTagName.java new file mode 100644 index 0000000..1d38eea --- /dev/null +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkJobTagName.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.util; + +public enum SparkJobTagName { + SITE("site"), + SPARK_APP_ID("sprkAppId"), + SPARK_APP_ATTEMPT_ID("sprkAppAttemptId"), + SPARK_APP_NAME("sprkAppName"), + SPARK_APP_NORM_NAME("normSprkAppName"), + SPARK_JOB_ID("jobId"), + SPARK_SATGE_ID("stageId"), + SPARK_STAGE_ATTEMPT_ID("stageAttemptId"), + SPARK_TASK_INDEX("taskIndex"), + SPARK_TASK_ATTEMPT_ID("taskAttemptId"), + SPARK_USER("user"), + SPARK_QUEUE("queue"), + SPARK_EXECUTOR_ID("executorId"); + + + private String tagName; + private SparkJobTagName(String tagName){ + this.tagName = tagName; + } + + public String toString(){ + return this.tagName; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java new file mode 100644 index 0000000..d5147b6 --- /dev/null +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * + */ +package org.apache.eagle.jpm.util.resourceFetch; + +import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.jpm.util.resourceFetch.connection.InputStreamUtils; +import org.apache.eagle.jpm.util.resourceFetch.ha.HAURLSelector; +import org.apache.eagle.jpm.util.resourceFetch.ha.HAURLSelectorImpl; +import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo; +import org.apache.eagle.jpm.util.resourceFetch.model.AppsWrapper; +import org.apache.eagle.jpm.util.resourceFetch.url.JobListServiceURLBuilderImpl; +import org.apache.eagle.jpm.util.resourceFetch.url.ServiceURLBuilder; +import org.apache.eagle.jpm.util.resourceFetch.url.SparkCompleteJobServiceURLBuilderImpl; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.map.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.List; + +public class RMResourceFetcher implements ResourceFetcher{ + + private static final Logger LOG = LoggerFactory.getLogger(RMResourceFetcher.class); + private final HAURLSelector selector; + private final ServiceURLBuilder jobListServiceURLBuilder; + private final ServiceURLBuilder sparkCompleteJobServiceURLBuilder; + + private static final ObjectMapper OBJ_MAPPER = new ObjectMapper(); + + static { + OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true); + } + + public RMResourceFetcher(String[] RMBasePaths) { + this.jobListServiceURLBuilder = new JobListServiceURLBuilderImpl(); + this.sparkCompleteJobServiceURLBuilder = new SparkCompleteJobServiceURLBuilderImpl(); + + this.selector = new HAURLSelectorImpl(RMBasePaths, jobListServiceURLBuilder, Constants.CompressionType.GZIP); + } + + private void checkUrl() throws IOException { + if (!selector.checkUrl(jobListServiceURLBuilder.build(selector.getSelectedUrl(), Constants.JobState.RUNNING.name()))) { + selector.reSelectUrl(); + } + } + + private List<Object> doFetchSparkFinishApplicationsList(String lastFinishTime) throws Exception { + List<AppInfo> result = null; + InputStream is = null; + try { + checkUrl(); + final String urlString = sparkCompleteJobServiceURLBuilder.build(selector.getSelectedUrl(), lastFinishTime); + LOG.info("Going to call yarn api to fetch finished spark job list: " + urlString); + is = InputStreamUtils.getInputStream(urlString, null, Constants.CompressionType.GZIP); + final AppsWrapper appWrapper = OBJ_MAPPER.readValue(is, AppsWrapper.class); + if (appWrapper != null && appWrapper.getApps() != null + && appWrapper.getApps().getApp() != null) { + result = appWrapper.getApps().getApp(); + return Arrays.asList((Object)result); + } + return null; + }finally { + if (is != null) { try {is.close();} catch (Exception e){} } + } + } + + + + public List<Object> getResource(Constants.ResourceType resoureType, Object... parameter) throws Exception{ + switch(resoureType) { + case COMPLETE_SPARK_JOB: + return doFetchSparkFinishApplicationsList((String)parameter[0]); + + default: + throw new Exception("Not support ressourceType :" + resoureType); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ResourceFetcher.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ResourceFetcher.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ResourceFetcher.java new file mode 100644 index 0000000..b21d030 --- /dev/null +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ResourceFetcher.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.jpm.util.resourceFetch; + +import org.apache.eagle.jpm.util.Constants; + +import java.util.List; + +public interface ResourceFetcher { + + List<Object> getResource(Constants.ResourceType resoureType, Object... parameter) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/SparkHistoryServerResourceFetcher.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/SparkHistoryServerResourceFetcher.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/SparkHistoryServerResourceFetcher.java new file mode 100644 index 0000000..c13bee0 --- /dev/null +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/SparkHistoryServerResourceFetcher.java @@ -0,0 +1,81 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.eagle.jpm.util.resourceFetch; + +import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.jpm.util.resourceFetch.connection.InputStreamUtils; +import org.apache.eagle.jpm.util.resourceFetch.model.SparkApplication; +import org.apache.eagle.jpm.util.resourceFetch.url.ServiceURLBuilder; +import org.apache.eagle.jpm.util.resourceFetch.url.SparkJobServiceURLBuilderImpl; +import org.apache.commons.codec.binary.Base64; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.map.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.List; + +public class SparkHistoryServerResourceFetcher implements ResourceFetcher{ + + private static final Logger LOG = LoggerFactory.getLogger(SparkHistoryServerResourceFetcher.class); + + private static final ObjectMapper OBJ_MAPPER = new ObjectMapper(); + + private String historyServerURL; + private final ServiceURLBuilder sparkDetailJobServiceURLBuilder; + private String auth; + + static { + OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true); + } + + public SparkHistoryServerResourceFetcher(String historyServerURL, String userName, String pwd){ + this.historyServerURL = historyServerURL; + this.sparkDetailJobServiceURLBuilder = new SparkJobServiceURLBuilderImpl(); + this.auth = "Basic " + new String(new Base64().encode(String.format("%s:%s", userName, pwd).getBytes()));; + } + + private List<Object> doFetchSparkApplicationDetail(String appId) throws Exception { + InputStream is = null; + try { + final String urlString = sparkDetailJobServiceURLBuilder.build(this.historyServerURL, appId); + LOG.info("Going to call spark history server api to fetch spark job: " + urlString); + is = InputStreamUtils.getInputStream(urlString, auth, Constants.CompressionType.NONE); + SparkApplication app = OBJ_MAPPER.readValue(is, SparkApplication.class); + return Arrays.asList((Object)app); + } catch (FileNotFoundException e) { + return null; + } finally { + if (is != null) { try {is.close();} catch (Exception e) { } } + } + } + + public List<Object> getResource(Constants.ResourceType resoureType, Object... parameter) throws Exception{ + switch(resoureType) { + case SPARK_JOB_DETAIL: + return doFetchSparkApplicationDetail((String)parameter[0]); + default: + throw new Exception("Not support resourceType :" + resoureType); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/InputStreamUtils.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/InputStreamUtils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/InputStreamUtils.java new file mode 100644 index 0000000..6d3fa45 --- /dev/null +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/InputStreamUtils.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.jpm.util.resourceFetch.connection; + +import org.apache.eagle.jpm.util.Constants; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.net.URLConnection; +import java.util.zip.GZIPInputStream; + +public class InputStreamUtils { + + private static final int CONNECTION_TIMEOUT = 10 * 1000; + private static final int READ_TIMEOUT = 5 * 60 * 1000; + private static final String GZIP_HTTP_HEADER = "Accept-Encoding"; + private static final String GZIP_COMPRESSION = "gzip"; + + private static InputStream openGZIPInputStream(URL url, String auth, int timeout) throws IOException { + final URLConnection connection = url.openConnection(); + connection.setConnectTimeout(CONNECTION_TIMEOUT); + connection.setReadTimeout(timeout); + connection.addRequestProperty(GZIP_HTTP_HEADER, GZIP_COMPRESSION); + if (null != auth){ + connection.setRequestProperty ("Authorization", auth); + } + return new GZIPInputStream(connection.getInputStream()); + } + + private static InputStream openInputStream(URL url, String auth, int timeout) throws IOException { + URLConnection connection = url.openConnection(); + connection.setConnectTimeout(timeout); + if (null != auth){ + connection.setRequestProperty ("Authorization", auth); + } + + return connection.getInputStream(); + } + + public static InputStream getInputStream(String urlString, String auth, Constants.CompressionType compressionType, int timeout) throws Exception { + final URL url = URLConnectionUtils.getUrl(urlString); + if (compressionType.equals(Constants.CompressionType.GZIP)) { + return openGZIPInputStream(url, auth, timeout); + } + else { // CompressionType.NONE + return openInputStream(url, auth, timeout); + } + } + + public static InputStream getInputStream(String urlString, String auth, Constants.CompressionType compressionType) throws Exception { + return getInputStream(urlString, auth, compressionType, READ_TIMEOUT); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/JobUtils.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/JobUtils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/JobUtils.java new file mode 100644 index 0000000..2e7b248 --- /dev/null +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/JobUtils.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.jpm.util.resourceFetch.connection; + +import org.apache.eagle.jpm.util.Constants; + +public class JobUtils { + + public static String checkAndAddLastSlash(String urlBase) { + if (!urlBase.endsWith("/")) { + return urlBase + "/"; + } + return urlBase; + } + + public static String getJobIDByAppID(String appID) { + if (appID.startsWith(Constants.APPLICATION_PREFIX)) { + return appID.replace(Constants.APPLICATION_PREFIX, Constants.JOB_PREFIX); + } + return null; + } + + public static String getAppIDByJobID(String jobID) { + if (jobID.startsWith(Constants.JOB_PREFIX)) { + return jobID.replace(Constants.JOB_PREFIX, Constants.APPLICATION_PREFIX); + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/URLConnectionUtils.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/URLConnectionUtils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/URLConnectionUtils.java new file mode 100644 index 0000000..d340d7b --- /dev/null +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/URLConnectionUtils.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.jpm.util.resourceFetch.connection; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.*; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLConnection; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.CertificateException; + +public final class URLConnectionUtils { + //TODO: change some public method to private + private static final Logger LOG = LoggerFactory.getLogger(URLConnectionUtils.class); + + public static URLConnection getConnection(String url) throws Exception { + if (url.startsWith("https://")) { + return getHTTPSConnection(url); + } else if (url.startsWith("http://")) { + return getHTTPConnection(url); + } + throw new Exception("Invalid input argument url: " + url); + } + + public static URLConnection getHTTPConnection(String urlString) throws Exception { + final URL url = new URL(urlString); + return url.openConnection(); + } + + public static URL getUrl(String urlString) throws Exception { + if(urlString.toLowerCase().contains("https")){ + return getHTTPSUrl(urlString); + }else if (urlString.toLowerCase().contains("http")) { + return getURL(urlString); + } + throw new Exception("Invalid input argument url: " + urlString); + } + + public static URL getURL(String urlString) throws MalformedURLException { + return new URL(urlString); + } + + public static URL getHTTPSUrl(String urlString) throws MalformedURLException, NoSuchAlgorithmException, KeyManagementException { + // Create a trust manager that does not validate certificate chains + final TrustManager[] trustAllCerts = new TrustManager[] {new TrustAllX509TrustManager()}; + // Install the all-trusting trust manager + final SSLContext sc = SSLContext.getInstance("SSL"); + sc.init(null, trustAllCerts, new java.security.SecureRandom()); + HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory()); + // Create all-trusting host name verifier + final HostnameVerifier allHostsValid = new HostnameVerifier() { + public boolean verify(String hostname, SSLSession session) { + return true; + } + }; + HttpsURLConnection.setDefaultHostnameVerifier(allHostsValid); + return new URL(urlString); + } + + public static URLConnection getHTTPSConnection(String urlString) throws IOException, KeyManagementException, NoSuchAlgorithmException { + final URL url = getHTTPSUrl(urlString); + return url.openConnection(); + } + + public static class TrustAllX509TrustManager implements X509TrustManager { + @Override + public void checkClientTrusted( + java.security.cert.X509Certificate[] chain, String authType) + throws CertificateException { + } + + @Override + public void checkServerTrusted( + java.security.cert.X509Certificate[] chain, String authType) + throws CertificateException { + } + + @Override + public java.security.cert.X509Certificate[] getAcceptedIssuers() { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelector.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelector.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelector.java new file mode 100644 index 0000000..6eea7e3 --- /dev/null +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelector.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.jpm.util.resourceFetch.ha; + +import java.io.IOException; + +public interface HAURLSelector { + + boolean checkUrl(String url); + + void reSelectUrl() throws IOException; + + String getSelectedUrl(); +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelectorImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelectorImpl.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelectorImpl.java new file mode 100644 index 0000000..6518ca1 --- /dev/null +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelectorImpl.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.jpm.util.resourceFetch.ha; + +import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.jpm.util.resourceFetch.connection.InputStreamUtils; +import org.apache.eagle.jpm.util.resourceFetch.url.ServiceURLBuilder; +import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; + +public class HAURLSelectorImpl implements HAURLSelector { + + private final String[] urls; + private volatile String selectedUrl; + private final ServiceURLBuilder builder; + + private volatile boolean reselectInProgress; + private final Constants.CompressionType compressionType; + private static final long MAX_RETRY_TIME = 3; + private static final Logger LOG = LoggerFactory.getLogger(HAURLSelectorImpl.class); + + public HAURLSelectorImpl(String[] urls, ServiceURLBuilder builder, Constants.CompressionType compressionType) { + this.urls = urls; + this.compressionType = compressionType; + this.builder = builder; + } + + public boolean checkUrl(String urlString) { + InputStream is = null; + try { + is = InputStreamUtils.getInputStream(urlString, null, compressionType); + } + catch (Exception ex) { + LOG.info("get inputstream from url: " + urlString + " failed. "); + return false; + } + finally { + if (is != null) { try { is.close(); } catch (IOException e) {/*Do nothing*/} } + } + return true; + } + + @Override + public String getSelectedUrl() { + if (selectedUrl == null) { + selectedUrl = urls[0]; + } + return selectedUrl; + } + + @Override + public void reSelectUrl() throws IOException { + if (reselectInProgress) return; + synchronized(this) { + if (reselectInProgress) return; + reselectInProgress = true; + try { + LOG.info("Going to reselect url"); + for (int i = 0; i < urls.length; i++) { + String urlToCheck = urls[i]; + LOG.info("Going to try url :" + urlToCheck); + for (int time = 0; time < MAX_RETRY_TIME; time++) { + if (checkUrl(builder.build(urlToCheck, Constants.JobState.RUNNING.name()))) { + selectedUrl = urls[i]; + LOG.info("Successfully switch to new url : " + selectedUrl); + return; + } + LOG.info("try url " + urlToCheck + "fail for " + (time+1) + " times, sleep 5 seconds before try again. "); + try { + Thread.sleep(5 * 1000); + } + catch (InterruptedException ex) { /* Do Nothing */} + } + } + throw new IOException("No alive url found: "+ StringUtils.join(";", Arrays.asList(this.urls))); + } + finally { + reselectInProgress = false; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppInfo.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppInfo.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppInfo.java new file mode 100644 index 0000000..463ce1e --- /dev/null +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppInfo.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.jpm.util.resourceFetch.model; + +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +public class AppInfo { + String id; + String user; + String name; + String queue; + String state; + String finalStatus; + double progress; + String trackingUI; + String trackingUrl; + String diagnostics; + String clusterId; + String applicationType; + long startedTime; + long finishedTime; + long elapsedTime; + String amContainerLogs; + String amHostHttpAddress; + + public String getId() { + return id; + } + public void setId(String id) { + this.id = id; + } + public String getUser() { + return user; + } + public void setUser(String user) { + this.user = user; + } + public String getName() { + return name; + } + public void setName(String name) { + this.name = name; + } + public String getQueue() { + return queue; + } + public void setQueue(String queue) { + this.queue = queue; + } + public String getState() { + return state; + } + public void setState(String state) { + this.state = state; + } + public String getFinalStatus() { + return finalStatus; + } + public void setFinalStatus(String finalStatus) { + this.finalStatus = finalStatus; + } + public double getProgress() { + return progress; + } + public void setProgress(double progress) { + this.progress = progress; + } + public String getTrackingUI() { + return trackingUI; + } + public void setTrackingUI(String trackingUI) { + this.trackingUI = trackingUI; + } + public String getTrackingUrl() { + return trackingUrl; + } + public void setTrackingUrl(String trackingUrl) { + this.trackingUrl = trackingUrl; + } + public String getDiagnostics() { + return diagnostics; + } + public void setDiagnostics(String diagnostics) { + this.diagnostics = diagnostics; + } + public String getClusterId() { + return clusterId; + } + public void setClusterId(String clusterId) { + this.clusterId = clusterId; + } + public String getApplicationType() { + return applicationType; + } + public void setApplicationType(String applicationType) { + this.applicationType = applicationType; + } + public long getStartedTime() { + return startedTime; + } + public void setStartedTime(long startedTime) { + this.startedTime = startedTime; + } + public long getFinishedTime() { + return finishedTime; + } + public void setFinishedTime(long finishedTime) { + this.finishedTime = finishedTime; + } + public long getElapsedTime() { + return elapsedTime; + } + public void setElapsedTime(long elapsedTime) { + this.elapsedTime = elapsedTime; + } + public String getAmContainerLogs() { + return amContainerLogs; + } + public void setAmContainerLogs(String amContainerLogs) { + this.amContainerLogs = amContainerLogs; + } + public String getAmHostHttpAddress() { + return amHostHttpAddress; + } + public void setAmHostHttpAddress(String amHostHttpAddress) { + this.amHostHttpAddress = amHostHttpAddress; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/Applications.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/Applications.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/Applications.java new file mode 100644 index 0000000..741fa1d --- /dev/null +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/Applications.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.jpm.util.resourceFetch.model; + +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +import java.util.List; + +@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +public class Applications { + + private List<AppInfo> app; + + public List<AppInfo> getApp() { + return app; + } + + public void setApp(List<AppInfo> app) { + this.app = app; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppsWrapper.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppsWrapper.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppsWrapper.java new file mode 100644 index 0000000..d791685 --- /dev/null +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppsWrapper.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.jpm.util.resourceFetch.model; + +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +public class AppsWrapper { + + private Applications apps; + + public Applications getApps() { + return apps; + } + + public void setApps(Applications apps) { + this.apps = apps; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplication.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplication.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplication.java new file mode 100644 index 0000000..5d25d84 --- /dev/null +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplication.java @@ -0,0 +1,57 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.eagle.jpm.util.resourceFetch.model; + +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +import java.util.List; + +@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +public class SparkApplication { + String id; + String name; + List<SparkApplicationAttempt> attempts; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public List<SparkApplicationAttempt> getAttempts() { + return attempts; + } + + public void setAttempts(List<SparkApplicationAttempt> attempts) { + this.attempts = attempts; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationAttempt.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationAttempt.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationAttempt.java new file mode 100644 index 0000000..6e91c03 --- /dev/null +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationAttempt.java @@ -0,0 +1,73 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.eagle.jpm.util.resourceFetch.model; + +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +public class SparkApplicationAttempt { + String attemptId; + String sparkUser; + String startTime; + String endTime; + boolean completed; + + public String getAttemptId() { + return attemptId; + } + + public void setAttemptId(String attemptId) { + this.attemptId = attemptId; + } + + public String getSparkUser() { + return sparkUser; + } + + public void setSparkUser(String sparkUser) { + this.sparkUser = sparkUser; + } + + public String getStartTime() { + return startTime; + } + + public void setStartTime(String startTime) { + this.startTime = startTime; + } + + public String getEndTime() { + return endTime; + } + + public void setEndTime(String endTime) { + this.endTime = endTime; + } + + public boolean isCompleted() { + return completed; + } + + public void setCompleted(boolean completed) { + this.completed = completed; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationWrapper.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationWrapper.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationWrapper.java new file mode 100644 index 0000000..5508863 --- /dev/null +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationWrapper.java @@ -0,0 +1,38 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.eagle.jpm.util.resourceFetch.model; + +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +public class SparkApplicationWrapper { + + SparkApplication app; + + public SparkApplication getApp() { + return app; + } + + public void setApp(SparkApplication app) { + this.app = app; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/JobListServiceURLBuilderImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/JobListServiceURLBuilderImpl.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/JobListServiceURLBuilderImpl.java new file mode 100644 index 0000000..3e21e8d --- /dev/null +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/JobListServiceURLBuilderImpl.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.jpm.util.resourceFetch.url; + +import org.apache.eagle.jpm.util.Constants; + +public class JobListServiceURLBuilderImpl implements ServiceURLBuilder { + + public String build(String ... parameters) { + // {rmUrl}/ws/v1/cluster/apps?state=RUNNING + String jobState = parameters[1]; + if (jobState.equals(Constants.JobState.RUNNING.name())) { + return parameters[0] + "/" + Constants.V2_APPS_RUNNING_URL + "&" + Constants.ANONYMOUS_PARAMETER; + } + else if (jobState.equals(Constants.JobState.COMPLETED.name())) { + return parameters[0] + "/" + Constants.V2_APPS_COMPLETED_URL + "&" + Constants.ANONYMOUS_PARAMETER; + } + else if (jobState.equals(Constants.JobState.ALL.name())) { + return parameters[0] + "/" + Constants.V2_APPS_URL + "&" + Constants.ANONYMOUS_PARAMETER; + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/ServiceURLBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/ServiceURLBuilder.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/ServiceURLBuilder.java new file mode 100644 index 0000000..597e359 --- /dev/null +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/ServiceURLBuilder.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.jpm.util.resourceFetch.url; + +public interface ServiceURLBuilder { + String build(String... parameters); +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/SparkCompleteJobServiceURLBuilderImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/SparkCompleteJobServiceURLBuilderImpl.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/SparkCompleteJobServiceURLBuilderImpl.java new file mode 100644 index 0000000..d4d235f --- /dev/null +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/SparkCompleteJobServiceURLBuilderImpl.java @@ -0,0 +1,29 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.eagle.jpm.util.resourceFetch.url; + +import org.apache.eagle.jpm.util.Constants; + +public class SparkCompleteJobServiceURLBuilderImpl implements ServiceURLBuilder { + + public String build(String... parameters) { + return String.format("%s/%s?applicationTypes=SPARK&state=FINISHED&finishedTimeBegin=%s&%s", parameters[0], Constants.V2_APPS_URL, parameters[1], Constants.ANONYMOUS_PARAMETER); + } +}
