This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit c0290399fcd514b05aafdc792dd01b9773176f06 Author: sibingzhang <74443791+sibingzh...@users.noreply.github.com> AuthorDate: Tue Nov 1 15:51:55 2022 +0800 KYLIN-5368 Fix when stop ke job on yarn are not be killed for yarn cluster mode --- .../rest/ISmartApplicationListenerForSystem.java | 32 ++++++++++++ .../apache/kylin/metadata/epoch/EpochManager.java | 5 ++ .../kylin/metadata/epoch/EpochManagerTest.java | 22 ++++++++ .../org/apache/kylin/rest/service/JobService.java | 34 +++++++++++- .../apache/kylin/rest/service/JobServiceTest.java | 61 ++++++++++++++++++++++ .../org/apache/kylin/rest/BootstrapServer.java | 9 +++- 6 files changed, 160 insertions(+), 3 deletions(-) diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/ISmartApplicationListenerForSystem.java b/src/common-service/src/main/java/org/apache/kylin/rest/ISmartApplicationListenerForSystem.java new file mode 100644 index 0000000000..8fc5f026e7 --- /dev/null +++ b/src/common-service/src/main/java/org/apache/kylin/rest/ISmartApplicationListenerForSystem.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.rest; + +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.ApplicationEvent; +import org.springframework.context.event.ContextClosedEvent; +import org.springframework.context.event.SmartApplicationListener; + +public interface ISmartApplicationListenerForSystem extends SmartApplicationListener { + + @Override + default boolean supportsEventType(Class<? extends ApplicationEvent> eventType) { + return eventType == ApplicationReadyEvent.class || eventType == ContextClosedEvent.class; + } + +} \ No newline at end of file diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java index bb7f2c3d2f..cbe1ba9590 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java @@ -814,4 +814,9 @@ public class EpochManager { return null; }); } + + public List<Epoch> getOwnedEpochs() { + return epochStore.list().stream().filter(this::checkEpochOwnerOnly).collect(Collectors.toList()); + } + } diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java index 164d952020..d488dfe67b 100644 --- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java +++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java @@ -214,6 +214,28 @@ class EpochManagerTest { } + @Test + void testGetOwnedEpochs() { + Epoch e1 = new Epoch(); + e1.setEpochTarget("test1"); + e1.setCurrentEpochOwner("owner1"); + + Epoch e2 = new Epoch(); + e2.setEpochTarget("test2"); + e2.setCurrentEpochOwner("owner2"); + + Epoch e3 = new Epoch(); + e3.setEpochTarget("test3"); + e3.setCurrentEpochOwner("owner2"); + + getEpochStore().insertBatch(Arrays.asList(e1, e2, e3)); + + EpochManager epochManager = EpochManager.getInstance(); + epochManager.setIdentity("owner2"); + + Assertions.assertEquals(2, epochManager.getOwnedEpochs().size()); + } + @Test void testForceUpdateEpoch() { EpochManager epochManager = EpochManager.getInstance(); diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java index 08ef663424..22424681c5 100644 --- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -47,6 +47,7 @@ import java.util.stream.Stream; import javax.servlet.http.HttpServletRequest; +import org.apache.kylin.metadata.epoch.EpochManager; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.io.IOUtils; @@ -66,6 +67,7 @@ import org.apache.kylin.common.metrics.MetricsGroup; import org.apache.kylin.common.metrics.MetricsName; import org.apache.kylin.common.msg.Message; import org.apache.kylin.common.msg.MsgPicker; +import org.apache.kylin.common.persistence.metadata.Epoch; import org.apache.kylin.common.persistence.transaction.UnitOfWork; import org.apache.kylin.common.persistence.transaction.UnitOfWorkContext; import org.apache.kylin.common.scheduler.EventBusFactory; @@ -107,6 +109,7 @@ import org.apache.kylin.metadata.model.Segments; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.project.EnhancedUnitOfWork; import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.rest.ISmartApplicationListenerForSystem; import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.request.JobFilter; import org.apache.kylin.rest.request.JobUpdateRequest; @@ -122,6 +125,8 @@ import org.apache.kylin.rest.util.SparkHistoryUIUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationEvent; +import org.springframework.context.event.ContextClosedEvent; import org.springframework.http.HttpHeaders; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.stereotype.Component; @@ -141,7 +146,7 @@ import lombok.val; import lombok.var; @Component("jobService") -public class JobService extends BasicService implements JobSupporter { +public class JobService extends BasicService implements JobSupporter, ISmartApplicationListenerForSystem { @Autowired private ProjectService projectService; @@ -1352,6 +1357,33 @@ public class JobService extends BasicService implements JobSupporter { MsgPicker.setMsg("en"); } } + + @Override + public void onApplicationEvent(ApplicationEvent event) { + if (event instanceof ContextClosedEvent) { + logger.info("Stop kyligence node, kill job on yarn for yarn cluster mode"); + EpochManager epochManager = EpochManager.getInstance(); + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + List<Epoch> ownedEpochs = epochManager.getOwnedEpochs(); + + for (Epoch epoch : ownedEpochs) { + String project = epoch.getEpochTarget(); + NExecutableManager executableManager = NExecutableManager.getInstance(kylinConfig, project); + if (executableManager != null) { + List<ExecutablePO> allJobs = executableManager.getAllJobs(); + for (ExecutablePO executablePO : allJobs) { + executableManager.cancelRemoteJob(executablePO); + } + } + } + } + } + + @Override + public int getOrder() { + return HIGHEST_PRECEDENCE; + } + @Setter @Getter static class ExecutablePOSortBean { diff --git a/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java b/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java index ee6db5a9be..71266afd4c 100644 --- a/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java +++ b/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java @@ -57,11 +57,14 @@ import java.util.stream.Collectors; import javax.servlet.http.HttpServletRequest; +import org.apache.kylin.metadata.epoch.EpochManager; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.exception.KylinException; +import org.apache.kylin.common.persistence.metadata.Epoch; +import org.apache.kylin.common.persistence.metadata.EpochStore; import org.apache.kylin.common.persistence.transaction.TransactionException; import org.apache.kylin.common.persistence.transaction.UnitOfWork; import org.apache.kylin.common.util.ClassUtil; @@ -89,6 +92,7 @@ import org.apache.kylin.job.execution.JobTypeEnum; import org.apache.kylin.job.execution.NExecutableManager; import org.apache.kylin.job.execution.StageBase; import org.apache.kylin.job.execution.SucceedChainedTestExecutable; +import org.apache.kylin.job.execution.SucceedDagTestExecutable; import org.apache.kylin.job.execution.SucceedTestExecutable; import org.apache.kylin.metadata.cube.model.NBatchConstants; import org.apache.kylin.metadata.cube.model.NDataflowManager; @@ -125,7 +129,9 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; import org.springframework.beans.factory.config.BeanDefinition; +import org.springframework.context.ApplicationEvent; import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider; +import org.springframework.context.event.ContextClosedEvent; import org.springframework.core.type.filter.AssignableTypeFilter; import org.springframework.http.HttpHeaders; import org.springframework.security.authentication.TestingAuthenticationToken; @@ -177,6 +183,9 @@ public class JobServiceTest extends NLocalFileMetadataTestCase { @Mock private ProjectService projectService = Mockito.spy(ProjectService.class); + @Mock + private ApplicationEvent applicationEvent = Mockito.mock(ContextClosedEvent.class); + @Rule public ExpectedException thrown = ExpectedException.none(); @@ -1901,4 +1910,56 @@ public class JobServiceTest extends NLocalFileMetadataTestCase { Assert.assertEquals(0.0, executable.getProgress(), 0); Assert.assertEquals("", executable.getRelatedSegment()); } + + @Test + public void tstOnApplicationEvent() { + final String PROJECT1 = "test1"; + final String PROJECT2 = "test2"; + final String PROJECT3 = "test3"; + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + NExecutableManager manager1 = NExecutableManager.getInstance(kylinConfig, PROJECT2); + + Epoch e1 = new Epoch(); + e1.setEpochTarget(PROJECT1); + e1.setCurrentEpochOwner("owner1"); + + Epoch e2 = new Epoch(); + e2.setEpochTarget(PROJECT2); + e2.setCurrentEpochOwner("owner2"); + + Epoch e3 = new Epoch(); + e3.setEpochTarget(PROJECT3); + e3.setCurrentEpochOwner("owner2"); + + try { + EpochStore.getEpochStore(kylinConfig).insertBatch(Arrays.asList(e1, e2, e3)); + } catch (Exception e) { + throw new RuntimeException("cannnot init epoch store!"); + } + + EpochManager epochManager = EpochManager.getInstance(); + epochManager.setIdentity("owner2"); + + val job = new DefaultExecutable(); + job.setProject(PROJECT2); + job.setJobType(JobTypeEnum.INDEX_BUILD); + + val executable1 = new SucceedDagTestExecutable(); + executable1.setProject(PROJECT2); + job.addTask(executable1); + + val executable2 = new SucceedDagTestExecutable(); + executable2.setProject(PROJECT2); + job.addTask(executable2); + + executable1.setNextSteps(Sets.newHashSet(executable2.getId())); + executable2.setPreviousStep(executable1.getId()); + + val executablePO = NExecutableManager.toPO(job, PROJECT2); + manager1.addJob(executablePO); + manager1.updateJobOutput(job.getId(), ExecutableState.RUNNING); + + jobService.onApplicationEvent(applicationEvent); + } + } diff --git a/src/server/src/main/java/org/apache/kylin/rest/BootstrapServer.java b/src/server/src/main/java/org/apache/kylin/rest/BootstrapServer.java index c18973ed27..474ccad787 100644 --- a/src/server/src/main/java/org/apache/kylin/rest/BootstrapServer.java +++ b/src/server/src/main/java/org/apache/kylin/rest/BootstrapServer.java @@ -38,7 +38,6 @@ import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClient; import org.springframework.cloud.zookeeper.discovery.ZookeeperInstance; import org.springframework.context.ApplicationEvent; -import org.springframework.context.ApplicationListener; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ImportResource; import org.springframework.context.event.ContextClosedEvent; @@ -60,7 +59,7 @@ import lombok.val; @EnableDiscoveryClient @LoadBalancerClient(name = "spring-boot-provider", configuration = org.apache.kylin.rest.LoadBalanced.class) @EnableSpringHttpSession -public class BootstrapServer implements ApplicationListener<ApplicationEvent> { +public class BootstrapServer implements ISmartApplicationListenerForSystem { private static final Logger logger = LoggerFactory.getLogger(BootstrapServer.class); @@ -135,4 +134,10 @@ public class BootstrapServer implements ApplicationListener<ApplicationEvent> { EpochManager.getInstance().releaseOwnedEpochs(); } } + + @Override + public int getOrder() { + return LOWEST_PRECEDENCE; + } + }