Repository: incubator-eagle Updated Branches: refs/heads/develop e2532a1db -> 2a99ace38
[EAGLE-443] refactor ProcessedTimeStamp Author: jinhuwu <jinh...@ebay.com> Author: pkuwm <ihuizhi...@gmail.com> Author: Zhao, Qingwen <qingwz...@ebay.com> Closes #326 from wujinhu/EAGLE-443. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/2a99ace3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/2a99ace3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/2a99ace3 Branch: refs/heads/develop Commit: 2a99ace382a61648f7b1d56c8acd51a98073ba14 Parents: e2532a1 Author: jinhuwu <wujinhu...@126.com> Authored: Thu Aug 11 11:01:02 2016 +0800 Committer: Qingwen Zhao <qingwen...@gmail.com> Committed: Thu Aug 11 11:01:02 2016 +0800 ---------------------------------------------------------------------- .../eagle/jpm/mr/history/MRHistoryJobMain.java | 3 - .../history/storm/HistoryJobProgressBolt.java | 132 ------------------- .../jpm/mr/history/storm/JobHistorySpout.java | 75 ++++++++++- .../mr/history/zkres/JobHistoryZKStateLCM.java | 2 + .../history/zkres/JobHistoryZKStateManager.java | 49 ++++++- 5 files changed, 119 insertions(+), 142 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2a99ace3/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java index ffa2f22..c6f1b98 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java @@ -25,7 +25,6 @@ import backtype.storm.topology.TopologyBuilder; import org.apache.eagle.jpm.mr.history.common.JHFConfigManager; import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter; import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilterBuilder; -import org.apache.eagle.jpm.mr.history.storm.HistoryJobProgressBolt; import org.apache.eagle.jpm.mr.history.storm.JobHistorySpout; import org.apache.eagle.jpm.util.Constants; @@ -59,7 +58,6 @@ public class MRHistoryJobMain { topologyName = jhfAppConf.getString("envContextConfig.topologyName"); } String spoutName = "mrHistoryJobExecutor"; - String boltName = "updateProcessTime"; int parallelism = jhfAppConf.getInt("envContextConfig.parallelismConfig." + spoutName); int tasks = jhfAppConf.getInt("envContextConfig.tasks." + spoutName); if (parallelism > tasks) { @@ -70,7 +68,6 @@ public class MRHistoryJobMain { new JobHistorySpout(filter, jhfConfigManager), parallelism ).setNumTasks(tasks); - topologyBuilder.setBolt(boltName, new HistoryJobProgressBolt(spoutName, jhfConfigManager), 1).setNumTasks(1).allGrouping(spoutName); Config config = new backtype.storm.Config(); config.setNumWorkers(jhfAppConf.getInt("envContextConfig.workers")); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2a99ace3/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/HistoryJobProgressBolt.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/HistoryJobProgressBolt.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/HistoryJobProgressBolt.java deleted file mode 100644 index 30374c4..0000000 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/HistoryJobProgressBolt.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.eagle.jpm.mr.history.storm; - -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Tuple; - -import java.util.*; - -import org.apache.eagle.jpm.mr.history.common.JHFConfigManager; -import org.apache.eagle.jpm.mr.history.entities.JobProcessTimeStampEntity; -import org.apache.eagle.service.client.IEagleServiceClient; -import org.apache.eagle.service.client.impl.EagleServiceClientImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class HistoryJobProgressBolt extends BaseRichBolt { - private static final Logger LOG = LoggerFactory.getLogger(HistoryJobProgressBolt.class); - - private final static int MAX_RETRY_TIMES = 3; - private Long m_minTimeStamp; - private int m_numTotalPartitions; - private JHFConfigManager configManager; - private Map<Integer, Long> m_partitionTimeStamp = new TreeMap<>(); - public HistoryJobProgressBolt(String parentName, JHFConfigManager configManager) { - this.configManager = configManager; - m_numTotalPartitions = this.configManager.getConfig().getInt("envContextConfig.parallelismConfig." + parentName); - m_minTimeStamp = 0L; - } - @Override - public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { - - } - - @Override - public void execute(Tuple tuple) { - Integer partitionId = tuple.getIntegerByField("partitionId"); - Long timeStamp = tuple.getLongByField("timeStamp"); - LOG.info("partition " + partitionId + ", timeStamp " + timeStamp); - if (!m_partitionTimeStamp.containsKey(partitionId) || (m_partitionTimeStamp.containsKey(partitionId) && m_partitionTimeStamp.get(partitionId) < timeStamp)) { - m_partitionTimeStamp.put(partitionId, timeStamp); - } - - if (m_partitionTimeStamp.size() >= m_numTotalPartitions) { - //get min timestamp - Long minTimeStamp = Collections.min(m_partitionTimeStamp.values()); - - if (m_minTimeStamp == 0L) { - m_minTimeStamp = minTimeStamp; - } - - if (m_minTimeStamp > minTimeStamp) { - //no need to update - return; - } - - m_minTimeStamp = minTimeStamp; - final JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig(); - final JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig(); - Map<String, String> baseTags = new HashMap<String, String>() { { - put("site", jobExtractorConfig.site); - } }; - JobProcessTimeStampEntity entity = new JobProcessTimeStampEntity(); - entity.setCurrentTimeStamp(m_minTimeStamp); - entity.setTimestamp(m_minTimeStamp); - entity.setTags(baseTags); - - IEagleServiceClient client = new EagleServiceClientImpl( - eagleServiceConfig.eagleServiceHost, - eagleServiceConfig.eagleServicePort, - eagleServiceConfig.username, - eagleServiceConfig.password); - - client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000); - - List<JobProcessTimeStampEntity> entities = new ArrayList<>(); - entities.add(entity); - - int tried = 0; - while (tried <= MAX_RETRY_TIMES) { - try { - LOG.info("start flushing JobProcessTimeStampEntity entities of total number " + entities.size()); - client.create(entities); - LOG.info("finish flushing entities of total number " + entities.size()); - break; - } catch (Exception ex) { - if (tried < MAX_RETRY_TIMES) { - LOG.error("Got exception to flush, retry as " + (tried + 1) + " times", ex); - } else { - LOG.error("Got exception to flush, reach max retry times " + MAX_RETRY_TIMES, ex); - } - } - tried ++; - } - - client.getJerseyClient().destroy(); - try { - client.close(); - } catch (Exception e) { - LOG.error("failed to close eagle service client ", e); - } - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { - - } - @Override - public void cleanup() { - super.cleanup(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2a99ace3/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java index a10599b..a0cdba7 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java @@ -26,10 +26,15 @@ import backtype.storm.tuple.Fields; import org.apache.eagle.dataproc.core.ValuesArray; import org.apache.eagle.jpm.mr.history.common.JHFConfigManager; import org.apache.eagle.jpm.mr.history.crawler.*; +import org.apache.eagle.jpm.mr.history.entities.JobProcessTimeStampEntity; import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager; +import org.apache.eagle.service.client.IEagleServiceClient; +import org.apache.eagle.service.client.impl.EagleServiceClientImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -86,6 +91,7 @@ public class JobHistorySpout extends BaseRichSpout { private JHFInputStreamCallback callback; private JHFConfigManager configManager; private JobHistoryLCM m_jhfLCM; + private final static int MAX_RETRY_TIMES = 3; public JobHistorySpout(JobHistoryContentFilter filter, JHFConfigManager configManager) { this(filter, configManager, new JobHistorySpoutCollectorInterceptor()); @@ -159,7 +165,8 @@ public class JobHistorySpout extends BaseRichSpout { public void nextTuple() { try { Long modifiedTime = driver.crawl(); - interceptor.collect(new ValuesArray(partitionId, modifiedTime)); + zkState.updateProcessedTimeStamp(partitionId, modifiedTime); + updateProcessedTimeStamp(modifiedTime); } catch (Exception ex) { LOG.error("fail crawling job history file and continue ...", ex); try { @@ -181,7 +188,6 @@ public class JobHistorySpout extends BaseRichSpout { */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("partitionId", "timeStamp")); } /** @@ -205,4 +211,69 @@ public class JobHistorySpout extends BaseRichSpout { @Override public void close() { } + + private void updateProcessedTimeStamp(long modifiedTime) { + if (partitionId != 0) { + return; + } + + //update latest process time + long minTimeStamp = modifiedTime; + for (int i = 1; i < numTotalPartitions; i++) { + long time = zkState.readProcessedTimeStamp(i); + if (time <= minTimeStamp) { + minTimeStamp = time; + } + } + + if (minTimeStamp == 0l) { + return; + } + + LOG.info("update process time stamp {}", minTimeStamp); + final JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig(); + final JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig(); + Map<String, String> baseTags = new HashMap<String, String>() { { + put("site", jobExtractorConfig.site); + } }; + JobProcessTimeStampEntity entity = new JobProcessTimeStampEntity(); + entity.setCurrentTimeStamp(minTimeStamp); + entity.setTimestamp(minTimeStamp); + entity.setTags(baseTags); + + IEagleServiceClient client = new EagleServiceClientImpl( + eagleServiceConfig.eagleServiceHost, + eagleServiceConfig.eagleServicePort, + eagleServiceConfig.username, + eagleServiceConfig.password); + + client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000); + + List<JobProcessTimeStampEntity> entities = new ArrayList<>(); + entities.add(entity); + + int tried = 0; + while (tried <= MAX_RETRY_TIMES) { + try { + LOG.info("start flushing JobProcessTimeStampEntity entities of total number " + entities.size()); + client.create(entities); + LOG.info("finish flushing entities of total number " + entities.size()); + break; + } catch (Exception ex) { + if (tried < MAX_RETRY_TIMES) { + LOG.error("Got exception to flush, retry as " + (tried + 1) + " times", ex); + } else { + LOG.error("Got exception to flush, reach max retry times " + MAX_RETRY_TIMES, ex); + } + } + tried ++; + } + + client.getJerseyClient().destroy(); + try { + client.close(); + } catch (Exception e) { + LOG.error("failed to close eagle service client ", e); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2a99ace3/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java index 308057b..933b347 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java @@ -28,4 +28,6 @@ public interface JobHistoryZKStateLCM { void addProcessedJob(String date, String jobId); void truncateProcessedJob(String date); void truncateEverything(); + long readProcessedTimeStamp(int partitionId); + void updateProcessedTimeStamp(int partitionId, long timeStamp); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2a99ace3/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java index 24dd7be..33d3cb2 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java @@ -38,6 +38,8 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM { public static final String ZNODE_LOCK_FOR_ENSURE_JOB_PARTITIONS = "lockForEnsureJobPartitions"; public static final String ZNODE_FORCE_START_FROM = "forceStartFrom"; public static final String ZNODE_PARTITIONS = "partitions"; + public static final String ZNODE_JOBS = "jobs"; + public static final String ZNODE_TIMESTAMPS = "timeStamps"; public static final int BACKOFF_DAYS = 0; @@ -201,7 +203,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM { @Override public String readProcessedDate(int partitionId) { - String path = zkRoot + "/partitions/" + partitionId; + String path = zkRoot + "/" + ZNODE_PARTITIONS + "/" + partitionId; try { if (_curator.checkExists().forPath(path) != null) { return new String(_curator.getData().forPath(path), "UTF-8"); @@ -216,7 +218,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM { @Override public void updateProcessedDate(int partitionId, String date) { - String path = zkRoot + "/partitions/" + partitionId; + String path = zkRoot + "/" + ZNODE_PARTITIONS + "/" + partitionId; try { if (_curator.checkExists().forPath(path) == null) { _curator.create() @@ -234,7 +236,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM { @Override public void addProcessedJob(String date, String jobId) { - String path = zkRoot + "/jobs/" + date + "/" + jobId; + String path = zkRoot + "/" + ZNODE_JOBS + "/" + date + "/" + jobId; try { if (_curator.checkExists().forPath(path) == null) { _curator.create() @@ -254,7 +256,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM { public void truncateProcessedJob(String date) { LOG.info("trying to truncate all data for day " + date); // we need lock before we do truncate - String path = zkRoot + "/jobs/" + date; + String path = zkRoot + "/" + ZNODE_JOBS + "/" + date; InterProcessMutex lock = new InterProcessMutex(_curator, path); try { lock.acquire(); @@ -277,7 +279,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM { @Override public List<String> readProcessedJobs(String date) { - String path = zkRoot + "/jobs/" + date; + String path = zkRoot + "/" + ZNODE_JOBS + "/" + date; try { if (_curator.checkExists().forPath(path) != null) { return _curator.getChildren().forPath(path); @@ -302,4 +304,41 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM { throw new RuntimeException(ex); } } + + @Override + public long readProcessedTimeStamp(int partitionId) { + String path = zkRoot + "/" + ZNODE_PARTITIONS + "/" + partitionId + "/" + ZNODE_TIMESTAMPS; + try { + if (_curator.checkExists().forPath(path) == null) { + _curator.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(path); + return 0l; + } else { + return Long.parseLong(new String(_curator.getData().forPath(path), "UTF-8")); + } + } catch (Exception e) { + LOG.error("fail to read timeStamp for partition " + partitionId, e); + throw new RuntimeException(e); + } + } + + @Override + public void updateProcessedTimeStamp(int partitionId, long timeStamp) { + String path = zkRoot + "/" + ZNODE_PARTITIONS + "/" + partitionId + "/" + ZNODE_TIMESTAMPS; + try { + if (_curator.checkExists().forPath(path) == null) { + _curator.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(path); + } + + _curator.setData().forPath(path, (timeStamp + "").getBytes("UTF-8")); + } catch (Exception e) { + LOG.error("fail to update timeStamp for partition " + partitionId, e); + throw new RuntimeException(e); + } + } }