Repository: incubator-eagle Updated Branches: refs/heads/master 6a0529e96 -> 4d4d8c0ea
[EAGLE-800] Use InterProcessMutex to sync operation in RunningJobManager - Use InterProcessMutex to sync operation in RunningJobManager. - Use siteId to generate lockpath for InterProcessMutex. - Fix some checkstyle problem. https://issues.apache.org/jira/browse/EAGLE-800 Author: r7raul1984 <tangji...@yhd.com> Closes #684 from r7raul1984/EAGLE-800. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/4d4d8c0e Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/4d4d8c0e Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/4d4d8c0e Branch: refs/heads/master Commit: 4d4d8c0eafa06d537a307291df03f344cdae6ef5 Parents: 6a0529e Author: r7raul1984 <tangji...@yhd.com> Authored: Fri Nov 25 21:52:40 2016 +0800 Committer: wujinhu <wujinhu...@126.com> Committed: Fri Nov 25 21:52:40 2016 +0800 ---------------------------------------------------------------------- .../impl/storm/zookeeper/ZKStateConfig.java | 1 + .../jpm/mr/running/MRRunningJobConfig.java | 3 + .../mr/running/recover/MRRunningJobManager.java | 2 +- .../mr/running/storm/MRRunningJobParseBolt.java | 6 +- .../mr/running/MRRunningJobApplicationTest.java | 15 ++- .../jpm/mr/running/MRRunningJobManagerTest.java | 120 +++++++++++++++++++ .../jpm/mr/running/parser/MRJobParserTest.java | 2 + .../src/test/resources/mrconf_30784.xml | 17 ++- .../spark/running/SparkRunningJobAppConfig.java | 3 + .../running/recover/SparkRunningJobManager.java | 2 +- .../running/storm/SparkRunningJobParseBolt.java | 14 ++- .../java/org/apache/eagle/jpm/util/Utils.java | 12 +- .../jpm/util/jobrecover/RunningJobManager.java | 35 +++--- .../org/apache/eagle/jpm/util/UtilsTest.java | 19 +++ .../hive/HiveQueryMonitoringApplication.java | 14 +-- .../hive/config/RunningJobCrawlConfig.java | 8 +- .../hive/jobrunning/HiveJobFetchSpout.java | 21 ++-- ...HiveJobRunningSourcedStormSpoutProvider.java | 53 ++++---- 18 files changed, 266 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java index f9515f5..53df455 100644 --- a/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java +++ b/eagle-core/eagle-data-process/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java @@ -25,4 +25,5 @@ public class ZKStateConfig implements Serializable { public int zkSessionTimeoutMs; public int zkRetryTimes; public int zkRetryInterval; + public String zkLockPath; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java index 975e821..119867d 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java @@ -21,6 +21,7 @@ package org.apache.eagle.jpm.mr.running; import org.apache.eagle.common.config.ConfigOptionParser; import com.typesafe.config.Config; +import org.apache.eagle.jpm.util.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +56,7 @@ public class MRRunningJobConfig implements Serializable { public int zkSessionTimeoutMs; public int zkRetryTimes; public int zkRetryInterval; + public String zkLockPath; } public static class EagleServiceConfig implements Serializable { @@ -108,6 +110,7 @@ public class MRRunningJobConfig implements Serializable { this.zkStateConfig.zkRetryTimes = config.getInt("zookeeper.zkRetryTimes"); this.zkStateConfig.zkRetryInterval = config.getInt("zookeeper.zkRetryInterval"); this.zkStateConfig.zkRoot = ZK_ROOT_PREFIX + "/" + config.getString("siteId"); + this.zkStateConfig.zkLockPath = Utils.makeLockPath(this.zkStateConfig.zkRoot); // parse eagle service endpoint this.eagleServiceConfig.eagleServiceHost = config.getString("service.host"); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java index 20a8701..70e6fda 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java @@ -34,7 +34,7 @@ public class MRRunningJobManager implements Serializable { public MRRunningJobManager(MRRunningJobConfig.ZKStateConfig config) { this.runningJobManager = new RunningJobManager(config.zkQuorum, - config.zkSessionTimeoutMs, config.zkRetryTimes, config.zkRetryInterval, config.zkRoot); + config.zkSessionTimeoutMs, config.zkRetryTimes, config.zkRetryInterval, config.zkRoot, config.zkLockPath); } public Map<String, JobExecutionAPIEntity> recoverYarnApp(String appId) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java index e2767d8..8ec2dec 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java @@ -75,8 +75,8 @@ public class MRRunningJobParseBolt extends BaseRichBolt { @Override public void execute(Tuple tuple) { - AppInfo appInfo = (AppInfo)tuple.getValue(1); - Map<String, JobExecutionAPIEntity> mrJobs = (Map<String, JobExecutionAPIEntity>)tuple.getValue(2); + AppInfo appInfo = (AppInfo) tuple.getValue(1); + Map<String, JobExecutionAPIEntity> mrJobs = (Map<String, JobExecutionAPIEntity>) tuple.getValue(2); LOG.info("get mr yarn application " + appInfo.getId()); @@ -100,7 +100,7 @@ public class MRRunningJobParseBolt extends BaseRichBolt { }); if (appInfo.getState().equals(Constants.AppState.FINISHED.toString()) - || applicationParser.status() == MRJobParser.ParserStatus.FINISHED) { + || applicationParser.status() == MRJobParser.ParserStatus.FINISHED) { applicationParser.setStatus(MRJobParser.ParserStatus.RUNNING); executorService.execute(applicationParser); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java index 5d78a50..787c9ac 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java @@ -58,15 +58,18 @@ import static org.powermock.api.mockito.PowerMockito.mockStatic; @PowerMockIgnore({"javax.*"}) public class MRRunningJobApplicationTest { - public static final String RM_URL = "http://sandbox.hortonworks.com:50030/ws/v1/cluster/apps?applicationTypes=MAPREDUCE&state=RUNNING&anonymous=true"; - public static final String RUNNING_YARNAPPS = "[application_1479206441898_35341, application_1479206441898_30784]"; - public static final String TUPLE_1 = "[application_1479206441898_30784, AppInfo{id='application_1479206441898_30784', user='xxx', name='oozie:launcher:T=shell:W=wf_co_xxx_xxx_v3:A=extract_org_data:ID=0002383-161115184801730-oozie-oozi-W', queue='xxx', state='RUNNING', finalStatus='UNDEFINED', progress=95.0, trackingUI='ApplicationMaster', trackingUrl='http://host.domain.com:8088/proxy/application_1479206441898_30784/', diagnostics='', clusterId='1479206441898', applicationType='MAPREDUCE', startedTime=1479328221694, finishedTime=0, elapsedTime=13367402, amContainerLogs='http://host.domain.com:8088/node/containerlogs/container_e11_1479206441898_30784_01_000001/xxx', amHostHttpAddress='host.domain.com:8088', allocatedMB=3072, allocatedVCores=2, runningContainers=2}, null]"; - public static final String TUPLE_2 = "[application_1479206441898_35341, AppInfo{id='application_1479206441898_35341', user='yyy', name='insert overwrite table inter...a.xxx(Stage-3)', queue='yyy', state='RUNNING', finalStatus='UNDEFINED', progress=59.545456, trackingUI='ApplicationMaster', trackingUrl='http://host.domain.com:8088/proxy/application_1479206441898_35341/', diagnostics='', clusterId='1479206441898', applicationType='MAPREDUCE', startedTime=1479341511477, finishedTime=0, elapsedTime=77619, amContainerLogs='http://host.domain.com:8042/node/containerlogs/container_e11_1479206441898_35341_01_000005/yyy', amHostHttpAddress='host.domain.com:8042', allocatedMB=27648, allocatedVCores=6, runningContainers=6}, null]"; + private static final String RM_URL = "http://sandbox.hortonworks.com:50030/ws/v1/cluster/apps?applicationTypes=MAPREDUCE&state=RUNNING&anonymous=true"; + private static final String RUNNING_YARNAPPS = "[application_1479206441898_35341, application_1479206441898_30784]"; + private static final String TUPLE_1 = "[application_1479206441898_30784, AppInfo{id='application_1479206441898_30784', user='xxx', name='oozie:launcher:T=shell:W=wf_co_xxx_xxx_v3:A=extract_org_data:ID=0002383-161115184801730-oozie-oozi-W', queue='xxx', state='RUNNING', finalStatus='UNDEFINED', progress=95.0, trackingUI='ApplicationMaster', trackingUrl='http://host.domain.com:8088/proxy/application_1479206441898_30784/', diagnostics='', clusterId='1479206441898', applicationType='MAPREDUCE', startedTime=1479328221694, finishedTime=0, elapsedTime=13367402, amContainerLogs='http://host.domain.com:8088/node/containerlogs/container_e11_1479206441898_30784_01_000001/xxx', amHostHttpAddress='host.domain.com:8088', allocatedMB=3072, allocatedVCores=2, runningContainers=2}, null]"; + private static final String TUPLE_2 = "[application_1479206441898_35341, AppInfo{id='application_1479206441898_35341', user='yyy', name='insert overwrite table inter...a.xxx(Stage-3)', queue='yyy', state='RUNNING', finalStatus='UNDEFINED', progress=59.545456, trackingUI='ApplicationMaster', trackingUrl='http://host.domain.com:8088/proxy/application_1479206441898_35341/', diagnostics='', clusterId='1479206441898', applicationType='MAPREDUCE', startedTime=1479341511477, finishedTime=0, elapsedTime=77619, amContainerLogs='http://host.domain.com:8042/node/containerlogs/container_e11_1479206441898_35341_01_000005/yyy', amHostHttpAddress='host.domain.com:8042', allocatedMB=27648, allocatedVCores=6, runningContainers=6}, null]"; private static final ObjectMapper OBJ_MAPPER = new ObjectMapper(); + private static Config config = ConfigFactory.load(); + private static String siteId; @BeforeClass public static void setupMapper() throws Exception { OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true); + siteId = config.getString("siteId"); } @@ -77,8 +80,6 @@ public class MRRunningJobApplicationTest { when(Executors.newFixedThreadPool(anyInt())).thenReturn(executorService); - MRRunningJobManager mrRunningJobManager = mock(MRRunningJobManager.class); - PowerMockito.whenNew(MRRunningJobManager.class).withArguments(any()).thenReturn(mrRunningJobManager); Config config = ConfigFactory.load(); MRRunningJobConfig mrRunningJobConfig = MRRunningJobConfig.newInstance(config); List<String> confKeyKeys = makeConfKeyKeys(mrRunningJobConfig); @@ -88,6 +89,8 @@ public class MRRunningJobApplicationTest { mrRunningJobConfig.getZkStateConfig(), confKeyKeys, config); + MRRunningJobManager mrRunningJobManager = mock(MRRunningJobManager.class); + PowerMockito.whenNew(MRRunningJobManager.class).withArguments(mrRunningJobConfig.getZkStateConfig()).thenReturn(mrRunningJobManager); mrRunningJobParseBolt.prepare(null, null, null); InputStream previousmrrunningapp = this.getClass().getResourceAsStream("/previousmrrunningapp.json"); AppsWrapper appsWrapper = OBJ_MAPPER.readValue(previousmrrunningapp, AppsWrapper.class); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobManagerTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobManagerTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobManagerTest.java new file mode 100644 index 0000000..55f76e2 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobManagerTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.mr.running; + +import com.typesafe.config.ConfigFactory; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.test.TestingServer; +import org.apache.curator.utils.CloseableUtils; +import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager; +import org.apache.eagle.jpm.util.jobrecover.RunningJobManager; +import org.apache.zookeeper.CreateMode; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.*; +import static org.powermock.api.mockito.PowerMockito.mockStatic; + + +@RunWith(PowerMockRunner.class) +@PrepareForTest({MRRunningJobManager.class, RunningJobManager.class, LoggerFactory.class}) +@PowerMockIgnore({"javax.*"}) +public class MRRunningJobManagerTest { + private static TestingServer zk; + private static com.typesafe.config.Config config = ConfigFactory.load(); + private static CuratorFramework curator; + private static final String SHARE_RESOURCES = "/apps/mr/running/sandbox/yarnAppId/jobId"; + private static final int QTY = 5; + private static final int REPETITIONS = QTY * 10; + private static MRRunningJobConfig.EndpointConfig endpointConfig; + private static MRRunningJobConfig.ZKStateConfig zkStateConfig; + private static org.slf4j.Logger log = mock(org.slf4j.Logger.class); + + @BeforeClass + public static void setupZookeeper() throws Exception { + zk = new TestingServer(); + curator = CuratorFrameworkFactory.newClient(zk.getConnectString(), new ExponentialBackoffRetry(1000, 3)); + curator.start(); + curator.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(SHARE_RESOURCES); + MRRunningJobConfig mrRunningJobConfig = MRRunningJobConfig.newInstance(config); + zkStateConfig = mrRunningJobConfig.getZkStateConfig(); + zkStateConfig.zkQuorum = zk.getConnectString(); + endpointConfig = mrRunningJobConfig.getEndpointConfig(); + mockStatic(LoggerFactory.class); + when(LoggerFactory.getLogger(any(Class.class))).thenReturn(log); + } + + @AfterClass + public static void teardownZookeeper() throws Exception { + if (curator.checkExists().forPath(SHARE_RESOURCES) != null) { + curator.delete().deletingChildrenIfNeeded().forPath(SHARE_RESOURCES); + } + CloseableUtils.closeQuietly(curator); + CloseableUtils.closeQuietly(zk); + } + + @Test + public void testMRRunningJobManagerDelWithLock() throws Exception { + Assert.assertTrue(curator.checkExists().forPath(SHARE_RESOURCES) != null); + + ExecutorService service = Executors.newFixedThreadPool(QTY); + for (int i = 0; i < QTY; ++i) { + Callable<Void> task = () -> { + try { + MRRunningJobManager mrRunningJobManager = new MRRunningJobManager(zkStateConfig); + for (int j = 0; j < REPETITIONS; ++j) { + mrRunningJobManager.delete("yarnAppId", "jobId"); + } + } catch (Exception e) { + e.printStackTrace(); + // log or do something + } + return null; + }; + service.submit(task); + } + + service.shutdown(); + service.awaitTermination(10, TimeUnit.MINUTES); + Assert.assertTrue(curator.checkExists().forPath(SHARE_RESOURCES) == null); + verify(log, never()).error(anyString(), anyString(), anyString(), anyString(), any(Throwable.class)); + verify(log, never()).error(anyString(), anyString(), anyString()); + verify(log, never()).error(anyString(), any(Throwable.class)); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java index 4b00bb2..561d858 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java @@ -72,6 +72,7 @@ public class MRJobParserTest { private static final String DATA_FROM_ZK = "{\"entityTags\":\"{\\\"jobName\\\":\\\"oozie:launcher:T=shell:W=wf_co_xxx_xxx_v3:A=extract_org_data:ID=0002383-161115184801730-oozie-oozi-W\\\",\\\"jobId\\\":\\\"job_1479206441898_30784\\\",\\\"site\\\":\\\"sandbox\\\",\\\"jobDefId\\\":\\\"eagletest\\\",\\\"jobType\\\":\\\"HIVE\\\",\\\"user\\\":\\\"xxx\\\",\\\"queue\\\":\\\"xxx\\\"}\",\"appInfo\":\"{\\\"applicationType\\\":\\\"MAPREDUCE\\\",\\\"startedTime\\\":\\\"1479328221694\\\",\\\"finalStatus\\\":\\\"UNDEFINED\\\",\\\"trackingUrl\\\":\\\"http:\\\\\\/\\\\\\/host.domain.com:8088\\\\\\/proxy\\\\\\/application_1479206441898_30784\\\\\\/\\\",\\\"runningContainers\\\":\\\"2\\\",\\\"trackingUI\\\":\\\"ApplicationMaster\\\",\\\"clusterId\\\":\\\"1479206441898\\\",\\\"amContainerLogs\\\":\\\"http:\\\\\\/\\\\\\/host.domain.com:8088\\\\\\/node\\\\\\/containerlogs\\\\\\/container_e11_1479206441898_30784_01_000001\\\\\\/xxx\\\",\\\"allocatedVCores\\\":\\\"2\\\",\\\"diagnostics\\\":\\\"\\\",\\\ "name\\\":\\\"oozie:launcher:T=shell:W=wf_co_xxx_xxx_v3:A=extract_org_data:ID=0002383-161115184801730-oozie-oozi-W\\\",\\\"progress\\\":\\\"95.0\\\",\\\"finishedTime\\\":\\\"0\\\",\\\"allocatedMB\\\":\\\"3072\\\",\\\"id\\\":\\\"application_1479206441898_30784\\\",\\\"state\\\":\\\"RUNNING\\\",\\\"amHostHttpAddress\\\":\\\"host.domain.com:8088\\\",\\\"user\\\":\\\"xxx\\\",\\\"queue\\\":\\\"xxx\\\",\\\"elapsedTime\\\":\\\"13367402\\\"}\"}"; private static TestingServer zk; private static String ZKROOT; + private static String siteId; private static MRRunningJobConfig mrRunningJobConfig; private static Config config = ConfigFactory.load(); private static CuratorFramework curator; @@ -83,6 +84,7 @@ public class MRJobParserTest { zk = new TestingServer(); curator = CuratorFrameworkFactory.newClient(zk.getConnectString(), new RetryOneTime(1)); mrRunningJobConfig = MRRunningJobConfig.newInstance(config); + siteId = mrRunningJobConfig.getEndpointConfig().site; mrRunningJobConfig.getZkStateConfig().zkQuorum = zk.getConnectString(); ZKROOT = mrRunningJobConfig.getZkStateConfig().zkRoot; OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml b/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml index 78d61b5..66da734 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml +++ b/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml @@ -1 +1,16 @@ -<conf><path>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.xml</path><property><name>eagle.job.name</name><value>eagletest</value><source>hdfs-default.xml</source><source>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.xml</source></property><property><name>hive.query.string</name><value>insert overwrite table xxxx</value><source>programatically</source><source>viewfs://xxx/user/xxx/.staging/job_1479206441898_124837/job.xml</source></property><property><name>hive.optimize.skewjoin.compiletime</name><value>false</value><source>programatically</source><source>org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@70a6620d</source><source>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.xml</source></property><!--<property><name>hadoop.security.group.mapping.ldap.search.filter.user</name><value>(&(objectClass=user)(sAMAccountName={0}))</value><source>core-default.xml</source><source>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job. xml</source></property>--><property><name>dfs.datanode.data.dir</name><value>file://${hadoop.tmp.dir}/dfs/data</value><source>hdfs-default.xml</source><source>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.xml</source></property></conf> \ No newline at end of file +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--><conf><path>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.xml</path><property><name>eagle.job.name</name><value>eagletest</value><source>hdfs-default.xml</source><source>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.xml</source></property><property><name>hive.query.string</name><value>insert overwrite table xxxx</value><source>programatically</source><source>viewfs://xxx/user/xxx/.staging/job_1479206441898_124837/job.xml</source></property><property><name>hive.optimize.skewjoin.compiletime</name><value>false</value><source>programatically</source><source>org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@70a6620d</source><source>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.xml</source></property><!--<property><name>hadoop.security.group.mapping.ldap.search.filter.user</name><value>(&(objectClass=user)(sAMAccountName={0}))</value><source>core-default.xml</source><source>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job. xml</source></property>--><property><name>dfs.datanode.data.dir</name><value>file://${hadoop.tmp.dir}/dfs/data</value><source>hdfs-default.xml</source><source>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.xml</source></property></conf> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java index 3ae4a35..c5ec6ce 100644 --- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java +++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java @@ -20,6 +20,7 @@ package org.apache.eagle.jpm.spark.running; import com.typesafe.config.ConfigValue; import com.typesafe.config.Config; +import org.apache.eagle.jpm.util.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,6 +66,7 @@ public class SparkRunningJobAppConfig implements Serializable { public int zkRetryTimes; public int zkRetryInterval; public boolean recoverEnabled; + public String zkLockPath; } public static class EagleServiceConfig implements Serializable { @@ -119,6 +121,7 @@ public class SparkRunningJobAppConfig implements Serializable { this.zkStateConfig.zkRetryTimes = config.getInt("zookeeper.zkRetryTimes"); this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zookeeper.zkSessionTimeoutMs"); this.zkStateConfig.zkRoot = DEFAULT_SPARK_JOB_RUNNING_ZOOKEEPER_ROOT; + this.zkStateConfig.zkLockPath = Utils.makeLockPath(DEFAULT_SPARK_JOB_RUNNING_ZOOKEEPER_ROOT + "/" + config.getString("siteId")); if (config.hasPath("zookeeper.zkRoot")) { this.zkStateConfig.zkRoot = config.getString("zookeeper.zkRoot"); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java index 3fb6371..4fbf53b 100644 --- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java +++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java @@ -33,7 +33,7 @@ public class SparkRunningJobManager implements Serializable { public SparkRunningJobManager(SparkRunningJobAppConfig.ZKStateConfig config) { this.runningJobManager = new RunningJobManager(config.zkQuorum, - config.zkSessionTimeoutMs, config.zkRetryTimes, config.zkRetryInterval, config.zkRoot); + config.zkSessionTimeoutMs, config.zkRetryTimes, config.zkRetryInterval, config.zkRoot, config.zkLockPath); } public Map<String, SparkAppEntity> recoverYarnApp(String appId) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java index 9c0ffef..0a74348 100644 --- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java +++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java @@ -74,14 +74,20 @@ public class SparkRunningJobParseBolt extends BaseRichBolt { @SuppressWarnings("unchecked") @Override public void execute(Tuple tuple) { - AppInfo appInfo = (AppInfo)tuple.getValue(1); - Map<String, SparkAppEntity> sparkApp = (Map<String, SparkAppEntity>)tuple.getValue(2); + AppInfo appInfo = (AppInfo) tuple.getValue(1); + Map<String, SparkAppEntity> sparkApp = (Map<String, SparkAppEntity>) tuple.getValue(2); LOG.info("get spark yarn application " + appInfo.getId()); SparkApplicationParser applicationParser; if (!runningSparkParsers.containsKey(appInfo.getId())) { - applicationParser = new SparkApplicationParser(eagleServiceConfig, endpointConfig, jobExtractorConfig, appInfo, sparkApp, new SparkRunningJobManager(zkStateConfig), resourceFetcher); + applicationParser = new SparkApplicationParser(eagleServiceConfig, + endpointConfig, + jobExtractorConfig, + appInfo, + sparkApp, + new SparkRunningJobManager(zkStateConfig), + resourceFetcher); runningSparkParsers.put(appInfo.getId(), applicationParser); LOG.info("create application parser for {}", appInfo.getId()); } else { @@ -97,7 +103,7 @@ public class SparkRunningJobParseBolt extends BaseRichBolt { }); if (appInfo.getState().equals(Constants.AppState.FINISHED.toString()) - || applicationParser.status() == SparkApplicationParser.ParserStatus.FINISHED) { + || applicationParser.status() == SparkApplicationParser.ParserStatus.FINISHED) { applicationParser.setStatus(SparkApplicationParser.ParserStatus.RUNNING); executorService.execute(applicationParser); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java index 91077df..9025d36 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java @@ -18,6 +18,8 @@ package org.apache.eagle.jpm.util; +import com.google.common.base.Preconditions; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,7 +29,6 @@ import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; public class Utils { @@ -86,11 +87,11 @@ public class Utils { int executorPB = Integer.parseInt(memory.substring(0, memory.length() - 1)); return 1024L * 1024 * 1024 * 1024 * 1024 * executorPB; } - LOG.warn("Cannot parse memory info " + memory); + LOG.warn("Cannot parse memory info " + memory); return 0L; } - + public static Constants.JobType fetchJobType(Map config) { if (config.get(Constants.JobConfiguration.CASCADING_JOB) != null) { return Constants.JobType.CASCADING; @@ -112,4 +113,9 @@ public class Utils { config.forEach(entry -> mapConfig.put(entry.getKey(), entry.getValue())); return fetchJobType(mapConfig); } + + public static String makeLockPath(String zkrootWithSiteId) { + Preconditions.checkArgument(StringUtils.isNotBlank(zkrootWithSiteId), "zkrootWithSiteId must not be blank"); + return zkrootWithSiteId.toLowerCase() + "/locks"; + } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java index a2d97bf..1857707 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java @@ -18,6 +18,8 @@ package org.apache.eagle.jpm.util.jobrecover; +import org.apache.commons.lang.StringUtils; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo; import org.apache.commons.lang3.tuple.Pair; import org.apache.curator.framework.CuratorFramework; @@ -39,8 +41,9 @@ public class RunningJobManager implements Serializable { private static final String ENTITY_TAGS_KEY = "entityTags"; private static final String APP_INFO_KEY = "appInfo"; private static final String ZNODE_LAST_FINISH_TIME = "lastFinishTime"; + private final InterProcessMutex lock; - private CuratorFramework newCurator(String zkQuorum, int zkSessionTimeoutMs, int zkRetryTimes, int zkRetryInterval) throws Exception { + private CuratorFramework newCurator(String zkQuorum, int zkSessionTimeoutMs, int zkRetryTimes, int zkRetryInterval) { return CuratorFrameworkFactory.newClient( zkQuorum, zkSessionTimeoutMs, @@ -49,12 +52,17 @@ public class RunningJobManager implements Serializable { ); } - public RunningJobManager(String zkQuorum, int zkSessionTimeoutMs, int zkRetryTimes, int zkRetryInterval, String zkRoot) { + public RunningJobManager(String zkQuorum, int zkSessionTimeoutMs, int zkRetryTimes, int zkRetryInterval, String zkRoot, String lockPath) { this.zkRoot = zkRoot; - + curator = newCurator(zkQuorum, zkSessionTimeoutMs, zkRetryTimes, zkRetryInterval); try { - curator = newCurator(zkQuorum, zkSessionTimeoutMs, zkRetryTimes, zkRetryInterval); curator.start(); + } catch (Exception e) { + LOG.error("curator start error {}", e); + } + LOG.info("InterProcessMutex lock path is " + lockPath); + lock = new InterProcessMutex(curator, lockPath); + try { if (curator.checkExists().forPath(this.zkRoot) == null) { curator.create() .creatingParentsIfNeeded() @@ -142,7 +150,6 @@ public class RunningJobManager implements Serializable { public boolean update(String yarnAppId, String jobId, Map<String, String> tags, AppInfo app) { String path = this.zkRoot + "/" + yarnAppId + "/" + jobId; - //InterProcessMutex lock = new InterProcessMutex(curator, path); Map<String, String> appInfo = new HashMap<>(); appInfo.put("id", app.getId()); appInfo.put("user", app.getUser()); @@ -169,7 +176,7 @@ public class RunningJobManager implements Serializable { fields.put(ENTITY_TAGS_KEY, (new JSONObject(tags)).toString()); fields.put(APP_INFO_KEY, (new JSONObject(appInfo)).toString()); try { - //lock.acquire(); + lock.acquire(); JSONObject object = new JSONObject(fields); if (curator.checkExists().forPath(path) == null) { curator.create() @@ -183,7 +190,7 @@ public class RunningJobManager implements Serializable { LOG.error("failed to update job {} for yarn app {} ", jobId, yarnAppId); } finally { try { - //lock.release(); + lock.release(); } catch (Exception e) { LOG.error("fail releasing lock", e); } @@ -193,9 +200,8 @@ public class RunningJobManager implements Serializable { public void delete(String yarnAppId, String jobId) { String path = this.zkRoot + "/" + yarnAppId + "/" + jobId; - //InterProcessMutex lock = new InterProcessMutex(curator, path); try { - //lock.acquire(); + lock.acquire(); if (curator.checkExists().forPath(path) != null) { curator.delete().deletingChildrenIfNeeded().forPath(path); LOG.info("delete job {} for yarn app {}, path {} ", jobId, yarnAppId, path); @@ -208,7 +214,7 @@ public class RunningJobManager implements Serializable { LOG.error("failed to delete job {} for yarn app {}, path {}, {}", jobId, yarnAppId, path, e); } finally { try { - //lock.release(); + lock.release(); } catch (Exception e) { LOG.error("fail releasing lock", e); @@ -218,9 +224,8 @@ public class RunningJobManager implements Serializable { public void delete(String yarnAppId) { String path = this.zkRoot + "/" + yarnAppId; - //InterProcessMutex lock = new InterProcessMutex(curator, path); try { - //lock.acquire(); + lock.acquire(); if (curator.checkExists().forPath(path) != null) { curator.delete().forPath(path); LOG.info("delete yarn app {}, path {} ", yarnAppId, path); @@ -229,7 +234,7 @@ public class RunningJobManager implements Serializable { LOG.error("failed to delete yarn app {}, path {} ", yarnAppId, path); } finally { try { - //lock.release(); + lock.release(); } catch (Exception e) { LOG.error("fail releasing lock", e); } @@ -243,14 +248,14 @@ public class RunningJobManager implements Serializable { while (keysItr.hasNext()) { String key = keysItr.next(); result.put(key, new HashMap<>()); - String value = (String)object.get(key); + String value = (String) object.get(key); JSONObject jsonObject = new JSONObject(value); Map<String, String> items = result.get(key); Iterator<String> keyItemItr = jsonObject.keys(); while (keyItemItr.hasNext()) { String itemKey = keyItemItr.next(); - items.put(itemKey, (String)jsonObject.get(itemKey)); + items.put(itemKey, (String) jsonObject.get(itemKey)); } } return result; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java b/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java index b29a8e0..8e89edf 100644 --- a/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java +++ b/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/UtilsTest.java @@ -64,4 +64,23 @@ public class UtilsTest { thrown.expect(IllegalArgumentException.class); Utils.parseMemory("0.1g"); } + + @Test + public void testMakeLockPath() { + String lockpath = Utils.makeLockPath("/apps/mr/running/sitdId"); + Assert.assertEquals("/apps/mr/running/sitdid/locks", lockpath); + } + + @Test + public void testMakeLockPath1() { + thrown.expect(IllegalArgumentException.class); + Utils.makeLockPath(""); + } + + @Test + public void testMakeLockPath2() { + thrown.expect(IllegalArgumentException.class); + Utils.makeLockPath(null); + } + } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveQueryMonitoringApplication.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveQueryMonitoringApplication.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveQueryMonitoringApplication.java index 6abea3d..7a4509b 100644 --- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveQueryMonitoringApplication.java +++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveQueryMonitoringApplication.java @@ -36,11 +36,11 @@ import org.apache.eagle.security.hive.sensitivity.HiveSensitivityDataEnrichBolt; * Since 8/11/16. */ public class HiveQueryMonitoringApplication extends StormApplication { - public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks"; - public final static String FILTER_TASK_NUM = "topology.numOfFilterTasks"; - public final static String PARSER_TASK_NUM = "topology.numOfParserTasks"; - public final static String JOIN_TASK_NUM = "topology.numOfJoinTasks"; - public final static String SINK_TASK_NUM = "topology.numOfSinkTasks"; + private static final String SPOUT_TASK_NUM = "topology.numOfSpoutTasks"; + private static final String FILTER_TASK_NUM = "topology.numOfFilterTasks"; + private static final String PARSER_TASK_NUM = "topology.numOfParserTasks"; + private static final String JOIN_TASK_NUM = "topology.numOfJoinTasks"; + private static final String SINK_TASK_NUM = "topology.numOfSinkTasks"; @Override public StormTopology execute(Config config, StormEnvironment environment) { @@ -68,13 +68,13 @@ public class HiveQueryMonitoringApplication extends StormApplication { BoltDeclarer joinBoltDeclarer = builder.setBolt("joinBolt", joinBolt, numOfJoinTasks); joinBoltDeclarer.fieldsGrouping("parserBolt", new Fields("user")); - StormStreamSink sinkBolt = environment.getStreamSink("hive_query_stream",config); + StormStreamSink sinkBolt = environment.getStreamSink("hive_query_stream", config); BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", sinkBolt, numOfSinkTasks); kafkaBoltDeclarer.fieldsGrouping("joinBolt", new Fields("user")); return builder.createTopology(); } - public static void main(String[] args){ + public static void main(String[] args) { Config config = ConfigFactory.load(); HiveQueryMonitoringApplication app = new HiveQueryMonitoringApplication(); app.run(config); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/config/RunningJobCrawlConfig.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/config/RunningJobCrawlConfig.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/config/RunningJobCrawlConfig.java index 2662698..35df281 100644 --- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/config/RunningJobCrawlConfig.java +++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/config/RunningJobCrawlConfig.java @@ -21,25 +21,25 @@ import org.apache.eagle.jpm.util.JobIdPartitioner; import java.io.Serializable; -public class RunningJobCrawlConfig implements Serializable{ +public class RunningJobCrawlConfig implements Serializable { private static final long serialVersionUID = 1L; public RunningJobEndpointConfig endPointConfig; public ControlConfig controlConfig; public ZKStateConfig zkStateConfig; - public RunningJobCrawlConfig(RunningJobEndpointConfig endPointConfig, ControlConfig controlConfig, ZKStateConfig zkStateConfig){ + public RunningJobCrawlConfig(RunningJobEndpointConfig endPointConfig, ControlConfig controlConfig, ZKStateConfig zkStateConfig) { this.endPointConfig = endPointConfig; this.controlConfig = controlConfig; this.zkStateConfig = zkStateConfig; } - public static class RunningJobEndpointConfig implements Serializable{ + public static class RunningJobEndpointConfig implements Serializable { private static final long serialVersionUID = 1L; public String[] RMBasePaths; public String HSBasePath; } - public static class ControlConfig implements Serializable{ + public static class ControlConfig implements Serializable { private static final long serialVersionUID = 1L; public boolean jobConfigEnabled; public boolean jobInfoEnabled; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobFetchSpout.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobFetchSpout.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobFetchSpout.java index af4599b..5f54f30 100644 --- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobFetchSpout.java +++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobFetchSpout.java @@ -94,26 +94,27 @@ public class HiveJobFetchSpout extends BaseRichSpout { this.partitionId = calculatePartitionId(context); // sanity verify 0<=partitionId<=numTotalPartitions-1 if (partitionId < 0 || partitionId > crawlConfig.controlConfig.numTotalPartitions) { - throw new IllegalStateException("partitionId should be less than numTotalPartitions with partitionId " + - partitionId + " and numTotalPartitions " + crawlConfig.controlConfig.numTotalPartitions); + throw new IllegalStateException("partitionId should be less than numTotalPartitions with partitionId " + + partitionId + " and numTotalPartitions " + crawlConfig.controlConfig.numTotalPartitions); } Class<? extends JobIdPartitioner> partitionerCls = crawlConfig.controlConfig.partitionerCls; try { this.jobFilter = new JobIdFilterByPartition(partitionerCls.newInstance(), - crawlConfig.controlConfig.numTotalPartitions, partitionId); + crawlConfig.controlConfig.numTotalPartitions, partitionId); } catch (Exception e) { LOG.error("failing instantiating job partitioner class " + partitionerCls.getCanonicalName()); throw new IllegalStateException(e); } this.collector = collector; this.runningJobManager = new RunningJobManager(crawlConfig.zkStateConfig.zkQuorum, - crawlConfig.zkStateConfig.zkSessionTimeoutMs, - crawlConfig.zkStateConfig.zkRetryTimes, - crawlConfig.zkStateConfig.zkRetryInterval, - crawlConfig.zkStateConfig.zkRoot); + crawlConfig.zkStateConfig.zkSessionTimeoutMs, + crawlConfig.zkStateConfig.zkRetryTimes, + crawlConfig.zkStateConfig.zkRetryInterval, + crawlConfig.zkStateConfig.zkRoot, + crawlConfig.zkStateConfig.zkLockPath); this.lastFinishAppTime = this.runningJobManager.recoverLastFinishedTime(partitionId); - if (this.lastFinishAppTime == 0l) { - this.lastFinishAppTime = Calendar.getInstance().getTimeInMillis() - 24 * 60 * 60000l;//one day ago + if (this.lastFinishAppTime == 0L) { + this.lastFinishAppTime = Calendar.getInstance().getTimeInMillis() - 24 * 60 * 60000L;//one day ago this.runningJobManager.updateLastFinishTime(partitionId, this.lastFinishAppTime); } } @@ -129,7 +130,7 @@ public class HiveJobFetchSpout extends BaseRichSpout { handleApps(apps, true); long fetchTime = Calendar.getInstance().getTimeInMillis(); - if (fetchTime - this.lastFinishAppTime > 60000l) { + if (fetchTime - this.lastFinishAppTime > 60000L) { apps = rmResourceFetcher.getResource(Constants.ResourceType.COMPLETE_MR_JOB, Long.toString(this.lastFinishAppTime)); if (apps == null) { apps = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4d4d8c0e/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java index 71f5949..c8b1f61 100644 --- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java +++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java @@ -18,6 +18,7 @@ package org.apache.eagle.security.hive.jobrunning; import backtype.storm.topology.base.BaseRichSpout; import org.apache.eagle.jpm.util.DefaultJobIdPartitioner; +import org.apache.eagle.jpm.util.Utils; import org.apache.eagle.security.hive.config.RunningJobCrawlConfig; import org.apache.eagle.security.hive.config.RunningJobCrawlConfig.ControlConfig; import org.apache.eagle.security.hive.config.RunningJobCrawlConfig.RunningJobEndpointConfig; @@ -27,19 +28,19 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class HiveJobRunningSourcedStormSpoutProvider { - private static final Logger LOG = LoggerFactory.getLogger(HiveJobRunningSourcedStormSpoutProvider.class); + private static final Logger LOG = LoggerFactory.getLogger(HiveJobRunningSourcedStormSpoutProvider.class); - public BaseRichSpout getSpout(Config config, int parallelism){ - RunningJobEndpointConfig endPointConfig = new RunningJobEndpointConfig(); - String RMEndPoints = config.getString("dataSourceConfig.RMEndPoints"); - endPointConfig.RMBasePaths = RMEndPoints.split(","); + public BaseRichSpout getSpout(Config config, int parallelism) { + RunningJobEndpointConfig endPointConfig = new RunningJobEndpointConfig(); + String RMEndPoints = config.getString("dataSourceConfig.RMEndPoints"); + endPointConfig.RMBasePaths = RMEndPoints.split(","); - String HSEndPoint = config.getString("dataSourceConfig.HSEndPoint"); - endPointConfig.HSBasePath = HSEndPoint; + String HSEndPoint = config.getString("dataSourceConfig.HSEndPoint"); + endPointConfig.HSBasePath = HSEndPoint; - ControlConfig controlConfig = new ControlConfig(); - controlConfig.jobInfoEnabled = true; - controlConfig.jobConfigEnabled = true; + ControlConfig controlConfig = new ControlConfig(); + controlConfig.jobInfoEnabled = true; + controlConfig.jobConfigEnabled = true; controlConfig.numTotalPartitions = parallelism <= 0 ? 1 : parallelism; boolean zkCleanupTimeSet = config.hasPath("dataSourceConfig.zkCleanupTimeInday"); @@ -56,24 +57,24 @@ public class HiveJobRunningSourcedStormSpoutProvider { controlConfig.sizeOfJobCompletedInfoQueue = sizeOfJobCompletedInfoQueue ? config.getInt("dataSourceConfig.sizeOfJobCompletedInfoQueue") : 10000; //controlConfig.numTotalPartitions = parallelism == null ? 1 : parallelism; - ZKStateConfig zkStateConfig = new ZKStateConfig(); - zkStateConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum"); - zkStateConfig.zkRoot = config.getString("dataSourceConfig.zkRoot"); - zkStateConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs"); - zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes"); - zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval"); - RunningJobCrawlConfig crawlConfig = new RunningJobCrawlConfig(endPointConfig, controlConfig, zkStateConfig); + ZKStateConfig zkStateConfig = new ZKStateConfig(); + zkStateConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum"); + zkStateConfig.zkRoot = config.getString("dataSourceConfig.zkRoot"); + zkStateConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs"); + zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes"); + zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval"); + zkStateConfig.zkLockPath = Utils.makeLockPath(zkStateConfig.zkRoot + "/" + config.getString("siteId")); + RunningJobCrawlConfig crawlConfig = new RunningJobCrawlConfig(endPointConfig, controlConfig, zkStateConfig); - try{ - controlConfig.partitionerCls = (Class<? extends DefaultJobIdPartitioner>)Class.forName(config.getString("dataSourceConfig.partitionerCls")); - } - catch(Exception ex){ - LOG.warn("failing find job id partitioner class " + config.getString("dataSourceConfig.partitionerCls")); - //throw new IllegalStateException("jobId partitioner class does not exist " + config.getString("dataSourceConfig.partitionerCls")); + try { + controlConfig.partitionerCls = (Class<? extends DefaultJobIdPartitioner>) Class.forName(config.getString("dataSourceConfig.partitionerCls")); + } catch (Exception ex) { + LOG.warn("failing find job id partitioner class " + config.getString("dataSourceConfig.partitionerCls")); + //throw new IllegalStateException("jobId partitioner class does not exist " + config.getString("dataSourceConfig.partitionerCls")); controlConfig.partitionerCls = DefaultJobIdPartitioner.class; } - HiveJobFetchSpout spout = new HiveJobFetchSpout(crawlConfig); - return spout; - } + HiveJobFetchSpout spout = new HiveJobFetchSpout(crawlConfig); + return spout; + } }