http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java new file mode 100644 index 0000000..e357cf6 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.common; + +import com.typesafe.config.Config; +import org.apache.eagle.dataproc.util.ConfigOptionParser; +import org.apache.eagle.jpm.mr.history.storm.DefaultJobIdPartitioner; +import org.apache.eagle.jpm.mr.history.storm.JobIdPartitioner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; + +public class JHFConfigManager implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(JHFConfigManager.class); + + private static final String JOB_CONFIGURE_KEY_CONF_FILE = "JobConfigKeys.conf"; + + public String getEnv() { + return env; + } + private String env; + + public ZKStateConfig getZkStateConfig() { return zkStateConfig; } + private ZKStateConfig zkStateConfig; + + public JobHistoryEndpointConfig getJobHistoryEndpointConfig() { return jobHistoryEndpointConfig; } + private JobHistoryEndpointConfig jobHistoryEndpointConfig; + + public ControlConfig getControlConfig() { return controlConfig; } + private ControlConfig controlConfig; + + public JobExtractorConfig getJobExtractorConfig() { return jobExtractorConfig; } + private JobExtractorConfig jobExtractorConfig; + + public EagleServiceConfig getEagleServiceConfig() { + return eagleServiceConfig; + } + private EagleServiceConfig eagleServiceConfig; + + public Config getConfig() { + return config; + } + private Config config; + + public static class ZKStateConfig implements Serializable { + public String zkQuorum; + public String zkRoot; + public int zkSessionTimeoutMs; + public int zkRetryTimes; + public int zkRetryInterval; + public String zkPort; + } + + public static class JobHistoryEndpointConfig implements Serializable { + public String nnEndpoint; + public String basePath; + public boolean pathContainsJobTrackerName; + public String jobTrackerName; + public String principal; + public String keyTab; + } + + public static class ControlConfig implements Serializable { + public boolean dryRun; + public Class<? extends JobIdPartitioner> partitionerCls; + public boolean zeroBasedMonth; + public String timeZone; + } + + public static class JobExtractorConfig implements Serializable { + public String site; + public String mrVersion; + public int readTimeoutSeconds; + } + + public static class EagleServiceConfig implements Serializable { + public String eagleServiceHost; + public int eagleServicePort; + public String username; + public String password; + } + + private static JHFConfigManager manager = new JHFConfigManager(); + + /** + * As this is singleton object and constructed while this class is being initialized, + * so any exception within this constructor will be wrapped with java.lang.ExceptionInInitializerError. + * And this is unrecoverable and hard to troubleshooting. + */ + private JHFConfigManager() { + this.zkStateConfig = new ZKStateConfig(); + this.jobHistoryEndpointConfig = new JobHistoryEndpointConfig(); + this.controlConfig = new ControlConfig(); + this.jobExtractorConfig = new JobExtractorConfig(); + this.eagleServiceConfig = new EagleServiceConfig(); + } + + public static JHFConfigManager getInstance(String []args) { + manager.init(args); + return manager; + } + + /** + * read configuration file and load hbase config etc + */ + private void init(String[] args) { + // TODO: Probably we can remove the properties file path check in future + try { + LOG.info("Loading from configuration file"); + this.config = new ConfigOptionParser().load(args); + } catch (Exception e) { + LOG.error("failed to load config"); + } + + this.env = config.getString("envContextConfig.env"); + + //parse eagle job extractor + this.jobExtractorConfig.site = config.getString("jobExtractorConfig.site"); + this.jobExtractorConfig.mrVersion = config.getString("jobExtractorConfig.mrVersion"); + this.jobExtractorConfig.readTimeoutSeconds = config.getInt("jobExtractorConfig.readTimeOutSeconds"); + //parse eagle zk + this.zkStateConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum"); + this.zkStateConfig.zkPort = config.getString("dataSourceConfig.zkPort"); + this.zkStateConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs"); + this.zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes"); + this.zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval"); + this.zkStateConfig.zkRoot = config.getString("dataSourceConfig.zkRoot"); + + //parse job history endpoint + this.jobHistoryEndpointConfig.basePath = config.getString("dataSourceConfig.basePath"); + this.jobHistoryEndpointConfig.jobTrackerName = config.getString("dataSourceConfig.jobTrackerName"); + this.jobHistoryEndpointConfig.nnEndpoint = config.getString("dataSourceConfig.nnEndpoint"); + this.jobHistoryEndpointConfig.pathContainsJobTrackerName = config.getBoolean("dataSourceConfig.pathContainsJobTrackerName"); + this.jobHistoryEndpointConfig.principal = config.getString("dataSourceConfig.principal"); + this.jobHistoryEndpointConfig.keyTab = config.getString("dataSourceConfig.keytab"); + + //parse control config + this.controlConfig.dryRun = config.getBoolean("dataSourceConfig.dryRun"); + try { + this.controlConfig.partitionerCls = (Class<? extends JobIdPartitioner>) Class.forName(config.getString("dataSourceConfig.partitionerCls")); + assert this.controlConfig.partitionerCls != null; + } catch (Exception e) { + LOG.warn("can not initialize partitioner class, use org.apache.eagle.jpm.mr.history.storm.DefaultJobIdPartitioner", e); + this.controlConfig.partitionerCls = DefaultJobIdPartitioner.class; + } finally { + LOG.info("Loaded partitioner class: {}",this.controlConfig.partitionerCls); + } + this.controlConfig.zeroBasedMonth = config.getBoolean("dataSourceConfig.zeroBasedMonth"); + this.controlConfig.timeZone = config.getString("dataSourceConfig.timeZone"); + + // parse eagle service endpoint + this.eagleServiceConfig.eagleServiceHost = config.getString("eagleProps.eagleService.host"); + String port = config.getString("eagleProps.eagleService.port"); + this.eagleServiceConfig.eagleServicePort = (port == null ? 8080 : Integer.parseInt(port)); + this.eagleServiceConfig.username = config.getString("eagleProps.eagleService.username"); + this.eagleServiceConfig.password = config.getString("eagleProps.eagleService.password"); + + LOG.info("Successfully initialized JHFConfigManager"); + LOG.info("env: " + this.env); + LOG.info("zookeeper.quorum: " + this.zkStateConfig.zkQuorum); + LOG.info("zookeeper.property.clientPort: " + this.zkStateConfig.zkPort); + LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost); + LOG.info("eagle.service.port: " + this.eagleServiceConfig.eagleServicePort); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JPAConstants.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JPAConstants.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JPAConstants.java new file mode 100755 index 0000000..feb5498 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JPAConstants.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.common; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JPAConstants { + + private final static Logger LOG = LoggerFactory.getLogger(JPAConstants.class); + + public static final String JPA_JOB_CONFIG_SERVICE_NAME = "JobConfigService"; + public static final String JPA_JOB_EVENT_SERVICE_NAME = "JobEventService"; + public static final String JPA_JOB_EXECUTION_SERVICE_NAME = "JobExecutionService"; + + public static final String JPA_TASK_ATTEMPT_EXECUTION_SERVICE_NAME = "TaskAttemptExecutionService"; + public static final String JPA_TASK_FAILURE_COUNT_SERVICE_NAME = "TaskFailureCountService"; + public static final String JPA_TASK_ATTEMPT_COUNTER_SERVICE_NAME = "TaskAttemptCounterService"; + public static final String JPA_TASK_EXECUTION_SERVICE_NAME = "TaskExecutionService"; + public static final String JPA_JOB_PROCESS_TIME_STAMP_NAME = "JobProcessTimeStampService"; + + public static final String JOB_TASK_TYPE_TAG = "taskType"; + + public static class JobConfiguration { + // job type + public static final String SCOOBI_JOB = "scoobi.mode"; + public static final String HIVE_JOB = "hive.query.string"; + public static final String PIG_JOB = "pig.script"; + public static final String CASCADING_JOB = "cascading.app.name"; + } + + /** + * MR task types + */ + public enum TaskType { + SETUP, MAP, REDUCE, CLEANUP + } + + public enum JobType { + CASCADING("CASCADING"),HIVE("HIVE"),PIG("PIG"),SCOOBI("SCOOBI"), + NOTAVALIABLE("N/A") + ; + private String value; + JobType(String value){ + this.value = value; + } + @Override + public String toString() { + return this.value; + } + } + + public static final String FILE_SYSTEM_COUNTER = "org.apache.hadoop.mapreduce.FileSystemCounter"; + public static final String TASK_COUNTER = "org.apache.hadoop.mapreduce.TaskCounter"; + + public static final String MAP_TASK_ATTEMPT_COUNTER = "MapTaskAttemptCounter"; + public static final String REDUCE_TASK_ATTEMPT_COUNTER = "ReduceTaskAttemptCounter"; + + public static final String MAP_TASK_ATTEMPT_FILE_SYSTEM_COUNTER = "MapTaskAttemptFileSystemCounter"; + public static final String REDUCE_TASK_ATTEMPT_FILE_SYSTEM_COUNTER = "ReduceTaskAttemptFileSystemCounter"; + + public enum TaskAttemptCounter { + TASK_ATTEMPT_DURATION, + } + + + + private static final String DEFAULT_JOB_CONF_NORM_JOBNAME_KEY = "eagle.job.name"; + private static final String EAGLE_NORM_JOBNAME_CONF_KEY = "eagle.job.normalizedfieldname"; + + public static String JOB_CONF_NORM_JOBNAME_KEY = null; + + static { + if (JOB_CONF_NORM_JOBNAME_KEY == null) { + JOB_CONF_NORM_JOBNAME_KEY = DEFAULT_JOB_CONF_NORM_JOBNAME_KEY; + } + LOG.info("Loaded " + EAGLE_NORM_JOBNAME_CONF_KEY + " : " + JOB_CONF_NORM_JOBNAME_KEY); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JobConfig.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JobConfig.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JobConfig.java new file mode 100644 index 0000000..f85f7bc --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JobConfig.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.common; + +import java.util.Map; +import java.util.TreeMap; + +public final class JobConfig { + private Map<String, String> config = new TreeMap<>(); + + public Map<String, String> getConfig() { + return config; + } + + public void setConfig(Map<String, String> config) { + this.config = config; + } + + public String toString(){ + return config.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/AbstractJobHistoryDAO.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/AbstractJobHistoryDAO.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/AbstractJobHistoryDAO.java new file mode 100644 index 0000000..5b330fc --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/AbstractJobHistoryDAO.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.crawler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.util.Timer; +import java.util.TimerTask; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * job history is the resource + */ +public abstract class AbstractJobHistoryDAO implements JobHistoryLCM { + private static final Logger LOG = LoggerFactory.getLogger(AbstractJobHistoryDAO.class); + + private final static String YEAR_URL_FORMAT = "/%4d"; + private final static String MONTH_URL_FORMAT = "/%02d"; + private final static String DAY_URL_FORMAT = "/%02d"; + private final static String YEAR_MONTH_DAY_URL_FORMAT = YEAR_URL_FORMAT + MONTH_URL_FORMAT + DAY_URL_FORMAT; + protected final static String SERIAL_URL_FORMAT = "/%06d"; + protected final static String FILE_URL_FORMAT = "/%s"; + private static final Pattern JOBTRACKERNAME_PATTERN = Pattern.compile("^.*_(\\d+)_$"); + protected static final Pattern JOBID_PATTERN = Pattern.compile("job_\\d+_\\d+"); + + protected final String m_basePath; + protected volatile String m_jobTrackerName; + + public static final String JOB_CONF_POSTFIX = "_conf.xml"; + + private final static Timer timer = new Timer(true); + private final static long JOB_TRACKER_SYNC_DURATION = 10 * 60 * 1000; // 10 minutes + + private boolean m_pathContainsJobTrackerName; + + public AbstractJobHistoryDAO(String basePath, boolean pathContainsJobTrackerName, String startingJobTrackerName) throws Exception { + m_basePath = basePath; + m_pathContainsJobTrackerName = pathContainsJobTrackerName; + m_jobTrackerName = startingJobTrackerName; + if (m_pathContainsJobTrackerName) { + if (startingJobTrackerName == null || startingJobTrackerName.isEmpty()) + throw new IllegalStateException("startingJobTrackerName should not be null or empty"); + // start background thread to check what is current job tracker + startThread(m_basePath); + } + } + + protected String buildWholePathToYearMonthDay(int year, int month, int day) { + StringBuilder sb = new StringBuilder(); + sb.append(m_basePath); + if (!m_pathContainsJobTrackerName && m_jobTrackerName != null && !m_jobTrackerName.isEmpty()) { + sb.append("/"); + sb.append(m_jobTrackerName); + } + sb.append(String.format(YEAR_MONTH_DAY_URL_FORMAT, year, month, day)); + return sb.toString(); + } + + protected String buildWholePathToSerialNumber(int year, int month, int day, int serialNumber) { + String wholePathToYearMonthDay = buildWholePathToYearMonthDay(year, month, day); + StringBuilder sb = new StringBuilder(); + sb.append(wholePathToYearMonthDay); + sb.append(String.format(SERIAL_URL_FORMAT, serialNumber)); + return sb.toString(); + } + + protected String buildWholePathToJobHistoryFile(int year, int month, int day, int serialNumber, String jobHistoryFileName) { + String wholePathToJobHistoryFile = buildWholePathToSerialNumber(year, month, day, serialNumber); + StringBuilder sb = new StringBuilder(); + sb.append(wholePathToJobHistoryFile); + sb.append(String.format(FILE_URL_FORMAT, jobHistoryFileName)); + return sb.toString(); + } + + + protected String buildWholePathToJobConfFile(int year, int month, int day, int serialNumber,String jobHistFileName) { + Matcher matcher = JOBID_PATTERN.matcher(jobHistFileName); + if (matcher.find()) { + String wholePathToJobConfFile = buildWholePathToSerialNumber(year, month, day, serialNumber); + StringBuilder sb = new StringBuilder(); + sb.append(wholePathToJobConfFile); + sb.append("/"); + sb.append(String.format(FILE_URL_FORMAT, matcher.group())); + sb.append(JOB_CONF_POSTFIX); + return sb.toString(); + } + LOG.warn("Illegal job history file name: "+jobHistFileName); + return null; + } + + private void startThread(final String basePath) throws Exception { + LOG.info("start an every-" + JOB_TRACKER_SYNC_DURATION / (60 * 1000) + "min timer task to check current jobTrackerName in background"); + // Automatically update current job tracker name in background every 30 minutes + timer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + try { + LOG.info("regularly checking current jobTrackerName in background"); + final String _jobTrackerName = calculateJobTrackerName(basePath); + if (_jobTrackerName != null && !_jobTrackerName.equals(m_jobTrackerName)) { + LOG.info("jobTrackerName changed from " + m_jobTrackerName +" to " + _jobTrackerName); + m_jobTrackerName = _jobTrackerName; + } + LOG.info("Current jobTrackerName is: " + m_jobTrackerName); + } catch (Exception e) { + LOG.error("failed to figure out current job tracker name that is not configured due to: " + e.getMessage(), e); + } catch (Throwable t) { + LOG.error("failed to figure out current job tracker name that is not configured due to: " + t.getMessage(), t); + } + } + }, JOB_TRACKER_SYNC_DURATION, JOB_TRACKER_SYNC_DURATION); + } + + + @Override + public void readFileContent(int year, int month, int day, int serialNumber, String jobHistoryFileName, JHFInputStreamCallback reader) throws Exception { + InputStream downloadIs; + try { + downloadIs = getJHFFileContentAsStream(year, month, day, serialNumber, jobHistoryFileName); + } catch (FileNotFoundException ex) { + LOG.error("job history file not found " + jobHistoryFileName+", ignore and will NOT process any more"); + return; + } + + InputStream downloadJobConfIs = null; + try { + downloadJobConfIs = getJHFConfContentAsStream(year, month, day, serialNumber, jobHistoryFileName); + } catch (FileNotFoundException ex) { + LOG.warn("job configuration file of "+ jobHistoryFileName+" not found , ignore and use empty configuration"); + } + + org.apache.hadoop.conf.Configuration conf = null; + + if (downloadJobConfIs != null) { + conf = new org.apache.hadoop.conf.Configuration(); + conf.addResource(downloadJobConfIs); + } + + try { + if (downloadIs != null) { + reader.onInputStream(downloadIs, conf); + } + } catch (Exception ex) { + LOG.error("fail reading job history file", ex); + throw ex; + } catch(Throwable t) { + LOG.error("fail reading job history file", t); + throw new Exception(t); + } finally { + try { + if(downloadJobConfIs != null) { + downloadJobConfIs.close(); + } + if (downloadIs != null) { + downloadIs.close(); + } + } catch (IOException e) { + LOG.error(e.getMessage(), e); + } + } + } + + + protected static long parseJobTrackerNameTimestamp(String jtname) { + Matcher matcher = JOBTRACKERNAME_PATTERN.matcher(jtname); + if (matcher.find()) { + return Long.parseLong(matcher.group(1)); + } + LOG.warn("invalid job tracker name: " + jtname); + return -1; + } +} + http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java new file mode 100644 index 0000000..ff0c8c8 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.crawler; + +import org.apache.eagle.dataproc.core.EagleOutputCollector; +import org.apache.eagle.jpm.mr.history.common.JHFConfigManager; +import org.apache.eagle.jpm.mr.history.parser.JHFParserBase; +import org.apache.eagle.jpm.mr.history.parser.JHFParserFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; + +public class DefaultJHFInputStreamCallback implements JHFInputStreamCallback { + private static final Logger LOG = LoggerFactory.getLogger(DefaultJHFInputStreamCallback.class); + + + private JobHistoryContentFilter m_filter; + private EagleOutputCollector m_eagleCollector; + private JHFConfigManager m_configManager; + + public DefaultJHFInputStreamCallback(JobHistoryContentFilter filter, JHFConfigManager configManager, EagleOutputCollector eagleCollector) { + this.m_filter = filter; + this.m_configManager = configManager; + this.m_eagleCollector = eagleCollector; + } + + @Override + public void onInputStream(InputStream jobFileInputStream, org.apache.hadoop.conf.Configuration conf) throws Exception { + final JHFConfigManager.JobExtractorConfig jobExtractorConfig = m_configManager.getJobExtractorConfig(); + @SuppressWarnings("serial") + Map<String, String> baseTags = new HashMap<String, String>() { { + put("site", jobExtractorConfig.site); + } }; + + if (!m_filter.acceptJobFile()) { + // close immediately if we don't need job file + jobFileInputStream.close(); + } else { + //get parser and parse, do not need to emit data now + JHFParserBase parser = JHFParserFactory.getParser(m_configManager, + baseTags, + conf, m_filter); + parser.parse(jobFileInputStream); + jobFileInputStream.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriver.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriver.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriver.java new file mode 100644 index 0000000..3edde5b --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriver.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.crawler; + +public interface JHFCrawlerDriver { + /** + * return -1 if failed or there is no file to crawl + * return modified time of the file if succeed + */ + long crawl() throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java new file mode 100644 index 0000000..8445434 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.crawler; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.eagle.jpm.mr.history.common.JHFConfigManager; +import org.apache.eagle.jpm.mr.history.storm.JobIdFilter; +import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateLCM; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * single thread crawling per driver + * multiple drivers can achieve parallelism + * + */ +public class JHFCrawlerDriverImpl implements JHFCrawlerDriver { + private static final Logger LOG = LoggerFactory.getLogger(JHFCrawlerDriverImpl.class); + + private final static int SLEEP_SECONDS_WHILE_QUEUE_IS_EMPTY = 10; + private final static String FORMAT_JOB_PROCESS_DATE = "%4d%02d%02d"; + private final static Pattern PATTERN_JOB_PROCESS_DATE = Pattern.compile("([0-9]{4})([0-9]{2})([0-9]{2})"); + + private static final int INITIALIZED = 0x0; + private static final int TODAY = 0x1; + private static final int BEFORETODAY = 0x10; + private final int PROCESSED_JOB_KEEP_DAYS = 5; + + private int m_flag = INITIALIZED; // 0 not set, 1 TODAY, 2 BEFORETODAY + private Deque<Pair<Long, String> > m_processQueue = new LinkedList<>(); + private Set<String> m_processedJobFileNames = new HashSet<>(); + + private final JobProcessDate m_proceeDate = new JobProcessDate(); + private boolean m_dryRun; + private JHFInputStreamCallback m_reader; + protected boolean m_zeroBasedMonth = true; + + private JobHistoryZKStateLCM m_zkStatelcm; + private JobHistoryLCM m_jhfLCM; + private JobIdFilter m_jobFilter; + private int m_partitionId; + private TimeZone m_timeZone; + + public JHFCrawlerDriverImpl(JHFConfigManager.JobHistoryEndpointConfig jobHistoryConfig, + JHFConfigManager.ControlConfig controlConfig, JHFInputStreamCallback reader, + JobHistoryZKStateLCM zkStateLCM, + JobHistoryLCM historyLCM, JobIdFilter jobFilter, int partitionId) throws Exception { + this.m_zeroBasedMonth = controlConfig.zeroBasedMonth; + this.m_dryRun = controlConfig.dryRun; + if (this.m_dryRun) LOG.info("this is a dry run"); + this.m_reader = reader; + m_jhfLCM = historyLCM;//new JobHistoryDAOImpl(jobHistoryConfig); + this.m_zkStatelcm = zkStateLCM; + this.m_partitionId = partitionId; + this.m_jobFilter = jobFilter; + m_timeZone = TimeZone.getTimeZone(controlConfig.timeZone); + } + + /** + * <br> + * 1. if queue is not empty <br> + * 1.1 dequeue and process one job file <br> + * 1.2 store processed job file and also cache it to processedJobFileNames + * 2. if queue is empty <br> + * 2.0 if flag is BEFORETODAY, then write currentProcessedDate to jobProcessedDate as this day's data are all processed <br> + * 2.1 crawl that day's job file list <br> + * 2.2 filter out those jobID which are in _processedJobIDs keyed by + * currentProcessedDate <br> + * 2.3 put available file list to processQueue and then go to step 1 + */ + @Override + public long crawl() throws Exception { + LOG.info("queue size is " + m_processQueue.size()); + while (m_processQueue.isEmpty()) { + // read lastProcessedDate only when it's initialized + if (m_flag == INITIALIZED) { + readAndCacheLastProcessedDate(); + } + if (m_flag == BEFORETODAY) { + updateProcessDate(); + clearProcessedJobFileNames(); + } + if (m_flag != TODAY) { // advance one day if initialized or BEFORE today + advanceOneDay(); + } + + if (isToday()) { + m_flag = TODAY; + } else { + m_flag = BEFORETODAY; + } + + List<String> serialNumbers = m_jhfLCM.readSerialNumbers(this.m_proceeDate.year, getActualMonth(m_proceeDate.month), this.m_proceeDate.day); + List<Pair<Long, String> > allJobHistoryFiles = new LinkedList<>(); + for (String serialNumber : serialNumbers) { + List<Pair<Long, String> > jobHistoryFiles = m_jhfLCM.readFileNames( + this.m_proceeDate.year, + getActualMonth(m_proceeDate.month), + this.m_proceeDate.day, + Integer.parseInt(serialNumber)); + LOG.info("total number of job history files " + jobHistoryFiles.size()); + for (Pair<Long, String> jobHistoryFile : jobHistoryFiles) { + if (m_jobFilter.accept(jobHistoryFile.getRight()) && !fileProcessed(jobHistoryFile.getRight())) { + allJobHistoryFiles.add(jobHistoryFile); + } + } + jobHistoryFiles.clear(); + LOG.info("after filtering, number of job history files " + m_processQueue.size()); + } + + Collections.sort(allJobHistoryFiles, + new Comparator<Pair<Long, String>>() { + @Override + public int compare(Pair<Long, String> o1, Pair<Long, String> o2) { + if (o1.getLeft() > o2.getLeft()) return 1; + else if (o1.getLeft() == o2.getLeft()) return 0; + else return -1; + } + } + ); + for (Pair<Long, String> jobHistoryFile : allJobHistoryFiles) { + m_processQueue.add(jobHistoryFile); + } + + allJobHistoryFiles.clear(); + + if (m_processQueue.isEmpty()) { + Thread.sleep(SLEEP_SECONDS_WHILE_QUEUE_IS_EMPTY * 1000); + } else { + LOG.info("queue size after populating is now : " + m_processQueue.size()); + } + } + // start to process job history file + Pair<Long, String> item = m_processQueue.pollFirst(); + String jobHistoryFile = item.getRight(); + Long modifiedTime = item.getLeft(); + if (jobHistoryFile == null) { // terminate this round of crawling when the queue is empty + LOG.info("process queue is empty, ignore this round"); + return -1; + } + // get serialNumber from job history file name + Pattern p = Pattern.compile("^job_[0-9]+_([0-9]+)[0-9]{3}[_-]{1}"); + Matcher m = p.matcher(jobHistoryFile); + String serialNumber; + if (m.find()) { + serialNumber = m.group(1); + } else { + LOG.warn("illegal job history file name : " + jobHistoryFile); + return -1; + } + if (!m_dryRun) { + m_jhfLCM.readFileContent( + m_proceeDate.year, + getActualMonth(m_proceeDate.month), + m_proceeDate.day, + Integer.valueOf(serialNumber), + jobHistoryFile, + m_reader); + } + m_zkStatelcm.addProcessedJob(String.format(FORMAT_JOB_PROCESS_DATE, + this.m_proceeDate.year, + this.m_proceeDate.month + 1, + this.m_proceeDate.day), + jobHistoryFile); + m_processedJobFileNames.add(jobHistoryFile); + + return modifiedTime; + } + + private void updateProcessDate() throws Exception { + String line = String.format(FORMAT_JOB_PROCESS_DATE, this.m_proceeDate.year, + this.m_proceeDate.month + 1, this.m_proceeDate.day); + m_zkStatelcm.updateProcessedDate(m_partitionId, line); + } + + private int getActualMonth(int month){ + return m_zeroBasedMonth ? m_proceeDate.month : m_proceeDate.month + 1; + } + + private static class JobProcessDate { + public int year; + public int month; // 0 based month + public int day; + } + + private void clearProcessedJobFileNames() { + m_processedJobFileNames.clear(); + } + + private void readAndCacheLastProcessedDate() throws Exception { + String lastProcessedDate = m_zkStatelcm.readProcessedDate(m_partitionId); + Matcher m = PATTERN_JOB_PROCESS_DATE.matcher(lastProcessedDate); + if (m.find() && m.groupCount() == 3) { + this.m_proceeDate.year = Integer.parseInt(m.group(1)); + this.m_proceeDate.month = Integer.parseInt(m.group(2)) - 1; // zero based month + this.m_proceeDate.day = Integer.parseInt(m.group(3)); + } else { + throw new IllegalStateException("job lastProcessedDate must have format YYYYMMDD " + lastProcessedDate); + } + + GregorianCalendar cal = new GregorianCalendar(m_timeZone); + cal.set(this.m_proceeDate.year, this.m_proceeDate.month, this.m_proceeDate.day, 0, 0, 0); + cal.add(Calendar.DATE, 1); + List<String> list = m_zkStatelcm.readProcessedJobs(String.format(FORMAT_JOB_PROCESS_DATE, cal.get(Calendar.YEAR), + cal.get(Calendar.MONTH) + 1, cal.get(Calendar.DAY_OF_MONTH))); + if (list != null) { + this.m_processedJobFileNames = new HashSet<>(list); + } + } + + private void advanceOneDay() throws Exception { + GregorianCalendar cal = new GregorianCalendar(m_timeZone); + cal.set(this.m_proceeDate.year, this.m_proceeDate.month, this.m_proceeDate.day, 0, 0, 0); + cal.add(Calendar.DATE, 1); + this.m_proceeDate.year = cal.get(Calendar.YEAR); + this.m_proceeDate.month = cal.get(Calendar.MONTH); + this.m_proceeDate.day = cal.get(Calendar.DAY_OF_MONTH); + + try { + clearProcessedJob(cal); + } catch (Exception e) { + LOG.error("failed to clear processed job ", e); + } + + } + + private void clearProcessedJob(Calendar cal) { + // clear all already processed jobs some days before current processing date (PROCESSED_JOB_KEEP_DAYS) + cal.add(Calendar.DATE, -1 - PROCESSED_JOB_KEEP_DAYS); + String line = String.format(FORMAT_JOB_PROCESS_DATE, cal.get(Calendar.YEAR), + cal.get(Calendar.MONTH) + 1, cal.get(Calendar.DAY_OF_MONTH)); + m_zkStatelcm.truncateProcessedJob(line); + } + + private boolean isToday() { + GregorianCalendar today = new GregorianCalendar(m_timeZone); + + if (today.get(Calendar.YEAR) == this.m_proceeDate.year + && today.get(Calendar.MONTH) == this.m_proceeDate.month + && today.get(Calendar.DAY_OF_MONTH) == this.m_proceeDate.day) + return true; + + return false; + } + + /** + * check if this file was already processed + * + * @param fileName + * @return + */ + private boolean fileProcessed(String fileName) { + if (m_processedJobFileNames.contains(fileName)) + return true; + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFInputStreamCallback.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFInputStreamCallback.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFInputStreamCallback.java new file mode 100644 index 0000000..52a62a4 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFInputStreamCallback.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.crawler; + +import org.apache.hadoop.conf.Configuration; + +import java.io.InputStream; +import java.io.Serializable; + +/** + * callback when job history file input stream is ready + */ +public interface JHFInputStreamCallback extends Serializable { + /** + * this is called when job file string and job configuration file is ready + * @param is + * @param configuration + * @throws Exception + */ + void onInputStream(InputStream is, Configuration configuration) throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilter.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilter.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilter.java new file mode 100644 index 0000000..6fbf3d3 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilter.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.crawler; + +import java.io.Serializable; +import java.util.List; +import java.util.regex.Pattern; + +/** + * define what content in job history stream should be streamed + * @author yonzhang + * + */ +public interface JobHistoryContentFilter extends Serializable { + boolean acceptJobFile(); + boolean acceptJobConfFile(); + List<Pattern> getMustHaveJobConfKeyPatterns(); + List<Pattern> getJobConfKeyInclusionPatterns(); + List<Pattern> getJobConfKeyExclusionPatterns(); +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterBuilder.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterBuilder.java new file mode 100644 index 0000000..43234c2 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterBuilder.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.crawler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.regex.Pattern; + +public class JobHistoryContentFilterBuilder { + private final static Logger LOG = LoggerFactory.getLogger(JobHistoryContentFilterBuilder.class); + + private boolean m_acceptJobFile; + private boolean m_acceptJobConfFile; + private List<Pattern> m_mustHaveJobConfKeyPatterns; + private List<Pattern> m_jobConfKeyInclusionPatterns; + private List<Pattern> m_jobConfKeyExclusionPatterns; + + public static JobHistoryContentFilterBuilder newBuilder(){ + return new JobHistoryContentFilterBuilder(); + } + + public JobHistoryContentFilterBuilder acceptJobFile() { + this.m_acceptJobFile = true; + return this; + } + + public JobHistoryContentFilterBuilder acceptJobConfFile() { + this.m_acceptJobConfFile = true; + return this; + } + + public JobHistoryContentFilterBuilder mustHaveJobConfKeyPatterns(Pattern ...patterns) { + m_mustHaveJobConfKeyPatterns = Arrays.asList(patterns); + if (m_jobConfKeyInclusionPatterns != null) { + List<Pattern> list = new ArrayList<Pattern>(); + list.addAll(m_jobConfKeyInclusionPatterns); + list.addAll(Arrays.asList(patterns)); + m_jobConfKeyInclusionPatterns = list; + } + else + m_jobConfKeyInclusionPatterns = Arrays.asList(patterns); + return this; + } + + public JobHistoryContentFilterBuilder includeJobKeyPatterns(Pattern ... patterns) { + if (m_jobConfKeyInclusionPatterns != null) { + List<Pattern> list = new ArrayList<Pattern>(); + list.addAll(m_jobConfKeyInclusionPatterns); + list.addAll(Arrays.asList(patterns)); + m_jobConfKeyInclusionPatterns = list; + } else + m_jobConfKeyInclusionPatterns = Arrays.asList(patterns); + return this; + } + + public JobHistoryContentFilterBuilder excludeJobKeyPatterns(Pattern ...patterns) { + m_jobConfKeyExclusionPatterns = Arrays.asList(patterns); + return this; + } + + public JobHistoryContentFilter build() { + JobHistoryContentFilterImpl filter = new JobHistoryContentFilterImpl(); + filter.setAcceptJobFile(m_acceptJobFile); + filter.setAcceptJobConfFile(m_acceptJobConfFile); + filter.setMustHaveJobConfKeyPatterns(m_mustHaveJobConfKeyPatterns); + filter.setJobConfKeyInclusionPatterns(m_jobConfKeyInclusionPatterns); + filter.setJobConfKeyExclusionPatterns(m_jobConfKeyExclusionPatterns); + LOG.info("job history content filter:" + filter); + return filter; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterImpl.java new file mode 100644 index 0000000..d8a482b --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterImpl.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.crawler; + +import java.util.List; +import java.util.regex.Pattern; + +public class JobHistoryContentFilterImpl implements JobHistoryContentFilter { + private boolean m_acceptJobFile; + private boolean m_acceptJobConfFile; + private List<Pattern> m_mustHaveJobConfKeyPatterns; + private List<Pattern> m_jobConfKeyInclusionPatterns; + private List<Pattern> m_jobConfKeyExclusionPatterns; + + @Override + public boolean acceptJobFile() { + return m_acceptJobFile; + } + + @Override + public boolean acceptJobConfFile() { + return m_acceptJobConfFile; + } + + @Override + public List<Pattern> getMustHaveJobConfKeyPatterns() { + return m_mustHaveJobConfKeyPatterns; + } + + @Override + public List<Pattern> getJobConfKeyInclusionPatterns() { + return m_jobConfKeyInclusionPatterns; + } + + @Override + public List<Pattern> getJobConfKeyExclusionPatterns() { + return m_jobConfKeyExclusionPatterns; + } + + public void setAcceptJobFile(boolean acceptJobFile) { + this.m_acceptJobFile = acceptJobFile; + } + + public void setAcceptJobConfFile(boolean acceptJobConfFile) { + this.m_acceptJobConfFile = acceptJobConfFile; + } + + public void setJobConfKeyInclusionPatterns( + List<Pattern> jobConfKeyInclusionPatterns) { + this.m_jobConfKeyInclusionPatterns = jobConfKeyInclusionPatterns; + } + + public void setJobConfKeyExclusionPatterns( + List<Pattern> jobConfKeyExclusionPatterns) { + this.m_jobConfKeyExclusionPatterns = jobConfKeyExclusionPatterns; + } + + public void setMustHaveJobConfKeyPatterns(List<Pattern> mustHaveJobConfKeyPatterns) { + this.m_mustHaveJobConfKeyPatterns = mustHaveJobConfKeyPatterns; + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("job history file:"); + sb.append(m_acceptJobFile); + sb.append(", job config file:"); + sb.append(m_acceptJobConfFile); + if(m_acceptJobConfFile){ + sb.append(", must contain keys:"); + sb.append(m_mustHaveJobConfKeyPatterns); + sb.append(", include keys:"); + sb.append(m_jobConfKeyInclusionPatterns); + sb.append(", exclude keys:"); + sb.append(m_jobConfKeyExclusionPatterns); + } + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java new file mode 100644 index 0000000..3b303fd --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.crawler; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import org.apache.eagle.jpm.util.HDFSUtil; +import org.apache.eagle.jpm.mr.history.common.JHFConfigManager.JobHistoryEndpointConfig; + +public class JobHistoryDAOImpl extends AbstractJobHistoryDAO { + private static final Logger LOG = LoggerFactory.getLogger(JobHistoryDAOImpl.class); + + private Configuration m_conf = new Configuration(); + + private FileSystem m_hdfs; + + public JobHistoryDAOImpl(JobHistoryEndpointConfig endpointConfig) throws Exception { + super(endpointConfig.basePath, endpointConfig.pathContainsJobTrackerName, endpointConfig.jobTrackerName); + this.m_conf.set("fs.defaultFS", endpointConfig.nnEndpoint); + this.m_conf.setBoolean("fs.hdfs.impl.disable.cache", true); + if (!endpointConfig.principal.equals("")) { + this.m_conf.set("hdfs.kerberos.principal", endpointConfig.principal); + this.m_conf.set("hdfs.keytab.file", endpointConfig.keyTab); + } + LOG.info("file system:" + endpointConfig.nnEndpoint); + m_hdfs = HDFSUtil.getFileSystem(m_conf); + } + + @Override + public void freshFileSystem() throws Exception { + try { + m_hdfs.close(); + } catch (Exception e) { + + } finally { + m_hdfs = HDFSUtil.getFileSystem(m_conf); + } + } + + @Override + public String calculateJobTrackerName(String basePath) throws Exception { + String latestJobTrackerName = null; + try { + Path hdfsFile = new Path(basePath); + FileStatus[] files = m_hdfs.listStatus(hdfsFile); + + // Sort by modification time as order of desc + Arrays.sort(files, new Comparator<FileStatus>() { + @Override + public int compare(FileStatus o1, FileStatus o2) { + long comp = parseJobTrackerNameTimestamp(o1.getPath().toString()) - parseJobTrackerNameTimestamp(o2.getPath().toString()); + if (comp > 0l) { + return -1; + } else if (comp < 0l) { + return 1; + } + return 0; + } + }); + + for (FileStatus fs : files) { + // back-compatible with hadoop 0.20 + // pick the first directory file which should be the latest modified. + if (fs.isDir()) { + latestJobTrackerName = fs.getPath().getName(); + break; + } + } + } catch(Exception ex) { + LOG.error("fail read job tracker name " + basePath, ex); + throw ex; + } + return latestJobTrackerName == null ? "" : latestJobTrackerName; + } + + @Override + public List<String> readSerialNumbers(int year, int month, int day) throws Exception { + List<String> serialNumbers = new ArrayList<>(); + String dailyPath = buildWholePathToYearMonthDay(year, month, day); + LOG.info("crawl serial numbers under one day : " + dailyPath); + try { + Path hdfsFile = new Path(dailyPath); + FileStatus[] files = m_hdfs.listStatus(hdfsFile); + for (FileStatus fs : files) { + if (fs.isDir()) { + serialNumbers.add(fs.getPath().getName()); + } + } + } catch (java.io.FileNotFoundException ex) { + LOG.warn("continue to crawl with failure to find file " + dailyPath); + LOG.debug("continue to crawl with failure to find file " + dailyPath, ex); + // continue to execute + return serialNumbers; + } catch (Exception ex) { + LOG.error("fail reading serial numbers under one day " + dailyPath, ex); + throw ex; + } + StringBuilder sb = new StringBuilder(); + for (String sn : serialNumbers) { + sb.append(sn);sb.append(","); + } + LOG.info("crawled serialNumbers: " + sb); + return serialNumbers; + } + + @SuppressWarnings("deprecation") + @Override + public List<Pair<Long, String> > readFileNames(int year, int month, int day, int serialNumber) throws Exception { + LOG.info("crawl file names under one serial number : " + year + "/" + month + "/" + day + ":" + serialNumber); + List<Pair<Long, String> > jobFileNames = new ArrayList<>(); + String serialPath = buildWholePathToSerialNumber(year, month, day, serialNumber); + try { + Path hdfsFile = new Path(serialPath); + // filter those files which is job configuration file in xml format + FileStatus[] files = m_hdfs.listStatus(hdfsFile, new PathFilter(){ + @Override + public boolean accept(Path path){ + if (path.getName().endsWith(".xml")) + return false; + return true; + } + }); + for (FileStatus fs : files) { + if (!fs.isDir()) { + jobFileNames.add(Pair.of(fs.getModificationTime(), fs.getPath().getName())); + } + } + if (LOG.isDebugEnabled()) { + StringBuilder sb = new StringBuilder(); + for (Pair<Long, String> sn : jobFileNames) { + sb.append(sn.getRight());sb.append(","); + } + LOG.debug("crawled: " + sb); + } + } catch (Exception ex) { + LOG.error("fail reading job history file names under serial number " + serialPath, ex); + throw ex; + } + return jobFileNames; + } + + /** + * it's the responsibility of caller to close input stream + */ + @Override + public InputStream getJHFFileContentAsStream(int year, int month, int day, int serialNumber, String jobHistoryFileName) throws Exception { + String path = buildWholePathToJobHistoryFile(year, month, day, serialNumber, jobHistoryFileName); + LOG.info("Read job history file: " + path); + try { + Path hdfsFile = new Path(path); + return m_hdfs.open(hdfsFile); + } catch(Exception ex) { + LOG.error("fail getting hdfs file inputstream " + path, ex); + throw ex; + } + } + + /** + * it's the responsibility of caller to close input stream + */ + @Override + public InputStream getJHFConfContentAsStream(int year, int month, int day, int serialNumber, String jobHistoryFileName) throws Exception { + String path = buildWholePathToJobConfFile(year, month, day, serialNumber,jobHistoryFileName); + if (path == null) return null; + + LOG.info("Read job conf file: " + path); + try { + Path hdfsFile = new Path(path); + return m_hdfs.open(hdfsFile); + } catch(Exception ex) { + LOG.error("fail getting job configuration input stream from " + path, ex); + throw ex; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryLCM.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryLCM.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryLCM.java new file mode 100644 index 0000000..de8d3f7 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryLCM.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.crawler; + +import org.apache.commons.lang3.tuple.Pair; + +import java.io.InputStream; +import java.util.List; + +/** + * Define various operations on job history file resource for lifecycle management + * + * The job history file directory structure supported is as follows: + * <basePath>/<jobTrackerName>/<year>/<month>/<day>/<serialNumber>/<jobHistoryFileName> + * + * In some hadoop version, <jobTrackerName> is not included + * + * The operations involved in resource read + * - list job tracker names under basePath (mostly basePath is configured in entry mapreduce.jobhistory.done-dir of mapred-site.xml) + * - list serial numbers under one day + * - list job history files under one serial number + * - read one job history file + * + */ +public interface JobHistoryLCM { + String calculateJobTrackerName(String basePath) throws Exception; + /** + * @param year + * @param month 0-based or 1-based month depending on hadoop cluster setting + * @param day + * @return + * @throws Exception + */ + List<String> readSerialNumbers(int year, int month, int day) throws Exception; + /** + * @param year + * @param month 0-based or 1-based month depending on hadoop cluster setting + * @param day + * @param serialNumber + * @return + * @throws Exception + */ + List<Pair<Long, String> > readFileNames(int year, int month, int day, int serialNumber) throws Exception; + /** + * @param year + * @param month 0-based or 1-based month depending on hadoop cluster setting + * @param day + * @param serialNumber + * @param jobHistoryFileName + * @param reader + * @throws Exception + */ + void readFileContent(int year, int month, int day, int serialNumber, String jobHistoryFileName, JHFInputStreamCallback reader) throws Exception; + /** + * @param year + * @param month 0-based or 1-based month depending on hadoop cluster setting + * @param day + * @param serialNumber + * @param jobHistoryFileName + * @return + * @throws Exception + */ + InputStream getJHFFileContentAsStream(int year, int month, int day, int serialNumber, String jobHistoryFileName) throws Exception; + InputStream getJHFConfContentAsStream(int year, int month, int day, int serialNumber, String jobConfFileName) throws Exception; + + /** + * + */ + void freshFileSystem() throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java new file mode 100644 index 0000000..e055957 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.crawler; + +import backtype.storm.spout.SpoutOutputCollector; +import org.apache.eagle.dataproc.core.EagleOutputCollector; +import org.apache.eagle.dataproc.core.ValuesArray; + +public class JobHistorySpoutCollectorInterceptor implements EagleOutputCollector { + private SpoutOutputCollector m_collector; + + public void setSpoutOutputCollector(SpoutOutputCollector collector) { + this.m_collector = collector; + } + + @Override + public void collect(ValuesArray t) { + m_collector.emit(t); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JPAEntityRepository.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JPAEntityRepository.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JPAEntityRepository.java new file mode 100755 index 0000000..d49cdef --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JPAEntityRepository.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.entities; + +import org.apache.eagle.jpm.mr.history.common.JobConfig; +import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters; +import org.apache.eagle.log.entity.repo.EntityRepository; + +public class JPAEntityRepository extends EntityRepository { + + public JPAEntityRepository() { + serDeserMap.put(JobCounters.class, new JobCountersSerDeser()); + serDeserMap.put(JobConfig.class, new JobConfigSerDeser()); + entitySet.add(JobConfigurationAPIEntity.class); + entitySet.add(JobEventAPIEntity.class); + entitySet.add(JobExecutionAPIEntity.class); + + entitySet.add(TaskAttemptExecutionAPIEntity.class); + entitySet.add(TaskExecutionAPIEntity.class); + entitySet.add(TaskFailureCountAPIEntity.class); + entitySet.add(TaskAttemptCounterAPIEntity.class); + entitySet.add(JobProcessTimeStampEntity.class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobBaseAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobBaseAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobBaseAPIEntity.java new file mode 100644 index 0000000..32c6f7c --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobBaseAPIEntity.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.entities; + +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; + +public class JobBaseAPIEntity extends TaggedLogAPIEntity { +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigSerDeser.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigSerDeser.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigSerDeser.java new file mode 100755 index 0000000..65f535f --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigSerDeser.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.entities; + +import org.apache.eagle.jpm.mr.history.common.JobConfig; +import org.apache.eagle.log.entity.meta.EntitySerDeser; +import org.apache.hadoop.hbase.util.Bytes; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.TreeMap; + +public class JobConfigSerDeser implements EntitySerDeser<JobConfig> { + + @Override + public JobConfig deserialize(byte[] bytes) { + JobConfig jc = new JobConfig(); + Map<String, String> map = new TreeMap<String, String>(); + jc.setConfig(map); + String sb = Bytes.toString(bytes); + String[] keyValue = sb.split(","); + for (String pair : keyValue) { + String str[] = pair.split(":"); + if (pair.equals("") || str[0].equals("")) continue; + String key = str[0]; + String value = ""; + if (str.length == 2) value = str[1]; + map.put(key, value); + } + return jc; + } + + @Override + public byte[] serialize(JobConfig conf) { + Map<String, String> map = conf.getConfig(); + StringBuilder sb = new StringBuilder(); + for (Entry<String, String> entry : map.entrySet()) + sb.append(entry.getKey() + ":" + entry.getValue() + ","); + sb.deleteCharAt(sb.length() - 1); + return sb.toString().getBytes(); + } + + @Override + public Class<JobConfig> type(){ + return JobConfig.class; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigurationAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigurationAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigurationAPIEntity.java new file mode 100755 index 0000000..44fa98c --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigurationAPIEntity.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.entities; + +import org.apache.eagle.jpm.mr.history.common.JPAConstants; +import org.apache.eagle.jpm.mr.history.common.JobConfig; +import org.apache.eagle.log.entity.meta.*; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@Table("eaglejpa") +@ColumnFamily("f") +@Prefix("jconf") +@Service(JPAConstants.JPA_JOB_CONFIG_SERVICE_NAME) +@TimeSeries(true) +@Partition({"site"}) +@Indexes({ + @Index(name="Index_1_jobId", columns = { "jobID" }, unique = true), + @Index(name="Index_2_normJobName", columns = { "normJobName" }, unique = false) +}) +public class JobConfigurationAPIEntity extends JobBaseAPIEntity { + + @Column("a") + private String configJobName; + @Column("b") + private JobConfig jobConfig; + @Column("c") + private String alertEmailList; + + public JobConfig getJobConfig() { + return jobConfig; + } + public void setJobConfig(JobConfig jobConfig) { + this.jobConfig = jobConfig; + _pcs.firePropertyChange("jobConfig", null, null); + } + public String getConfigJobName() { + return configJobName; + } + public void setConfigJobName(String configJobName) { + this.configJobName = configJobName; + _pcs.firePropertyChange("configJobName", null, null); + } + public String getAlertEmailList() { + return alertEmailList; + } + public void setAlertEmailList(String alertEmailList) { + this.alertEmailList = alertEmailList; + _pcs.firePropertyChange("alertEmailList", null, null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobCountersSerDeser.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobCountersSerDeser.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobCountersSerDeser.java new file mode 100755 index 0000000..01044bb --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobCountersSerDeser.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.entities; + +import org.apache.eagle.jpm.mr.history.jobcounter.*; +import org.apache.eagle.log.entity.meta.EntitySerDeser; +import org.apache.hadoop.hbase.util.Bytes; + +import java.util.Arrays; +import java.util.Map; +import java.util.TreeMap; + +public class JobCountersSerDeser implements EntitySerDeser<JobCounters> { + + private CounterGroupDictionary dictionary = null; + + @Override + public JobCounters deserialize(byte[] bytes) { + JobCounters counters = new JobCounters(); + final int length = bytes.length; + if (length < 4) { + return counters; + } + + final Map<String, Map<String, Long> > groupMap = counters.getCounters(); + int pos = 0; + final int totalGroups = Bytes.toInt(bytes, pos); + pos += 4; + + for (int i = 0; i < totalGroups; ++i) { + final int groupIndex = Bytes.toInt(bytes, pos); + pos += 4; + final int totalCounters = Bytes.toInt(bytes, pos); + pos += 4; + final int nextGroupPos = pos + (totalCounters * 12); + try { + final CounterGroupKey groupKey = getCounterGroup(groupIndex); + if (groupKey == null) { + throw new JobCounterException("Group index " + groupIndex + " is not defined"); + } + final Map<String, Long> counterMap = new TreeMap<String, Long>(); + groupMap.put(groupKey.getName(), counterMap); + for (int j = 0; j < totalCounters; ++j) { + final int counterIndex = Bytes.toInt(bytes, pos); + pos += 4; + final long value = Bytes.toLong(bytes, pos); + pos += 8; + final CounterKey counterKey = groupKey.getCounterKeyByID(counterIndex); + if (counterKey == null) { + continue; + } + counterMap.put(counterKey.getNames().get(0), value); + } + } catch (JobCounterException ex) { + // skip the group + pos = nextGroupPos; + } + } + return counters; + } + + @Override + public byte[] serialize(JobCounters counters) { + + final Map<String, Map<String, Long>> groupMap = counters.getCounters(); + int totalSize = 4; + for (Map<String, Long> counterMap : groupMap.values()) { + final int counterCount = counterMap.size(); + totalSize += counterCount * 12 + 8; + } + byte[] buffer = new byte[totalSize]; + + int totalGroups = 0; + int pos = 0; + int totalGroupNumberPos = pos; + pos += 4; + int nextGroupPos = pos; + + for (Map.Entry<String, Map<String, Long>> entry : groupMap.entrySet()) { + final String groupName = entry.getKey(); + final Map<String, Long> counterMap = entry.getValue(); + try { + nextGroupPos = pos = serializeGroup(buffer, pos, groupName, counterMap); + ++totalGroups; + } catch (JobCounterException ex) { + pos = nextGroupPos; + } + } + + Bytes.putInt(buffer, totalGroupNumberPos, totalGroups); + if (pos < totalSize) { + buffer = Arrays.copyOf(buffer, pos); + } + return buffer; + } + + @Override + public Class<JobCounters> type() { + return JobCounters.class; + } + + private int serializeGroup(byte[] buffer, int currentPos, String groupName, Map<String, Long> counterMap) throws JobCounterException { + int pos = currentPos; + final CounterGroupKey groupKey = getCounterGroup(groupName); + if (groupKey == null) { + throw new JobCounterException("Group name " + groupName + " is not defined"); + } + Bytes.putInt(buffer, pos, groupKey.getIndex()); + pos += 4; + int totalCounterNumberPos = pos; + pos += 4; + int totalCounters = 0; + + for (Map.Entry<String, Long> entry : counterMap.entrySet()) { + final String counterName = entry.getKey(); + final CounterKey counterKey = groupKey.getCounterKeyByName(counterName); + if (counterKey == null) { + continue; + } + final Long counterValue = entry.getValue(); + Bytes.putInt(buffer, pos, counterKey.getIndex()); + pos += 4; + Bytes.putLong(buffer, pos, counterValue); + pos += 8; + ++totalCounters; + } + Bytes.putInt(buffer, totalCounterNumberPos, totalCounters); + return pos; + } + + private CounterGroupKey getCounterGroup(String groupName) throws JobCounterException { + if (dictionary == null) { + dictionary = CounterGroupDictionary.getInstance(); + } + final CounterGroupKey groupKey = dictionary.getCounterGroupByName(groupName); + if (groupKey == null) { + throw new JobCounterException("Invalid counter group name: " + groupName); + } + return groupKey; + } + + private CounterGroupKey getCounterGroup(int groupIndex) throws JobCounterException { + if (dictionary == null) { + dictionary = CounterGroupDictionary.getInstance(); + } + return dictionary.getCounterGroupByIndex(groupIndex); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobEventAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobEventAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobEventAPIEntity.java new file mode 100644 index 0000000..3639ad0 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobEventAPIEntity.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.entities; + +import org.apache.eagle.jpm.mr.history.common.JPAConstants; +import org.apache.eagle.log.entity.meta.*; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@Table("eaglejpa") +@ColumnFamily("f") +@Prefix("jevent") +@Service(JPAConstants.JPA_JOB_EVENT_SERVICE_NAME) +@TimeSeries(true) +@Partition({"site"}) +public class JobEventAPIEntity extends JobBaseAPIEntity { + + @Column("a") + private String eventType; + + public String getEventType() { + return eventType; + } + public void setEventType(String eventType) { + this.eventType = eventType; + _pcs.firePropertyChange("eventType", null, null); + } +}
