This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit c0290399fcd514b05aafdc792dd01b9773176f06
Author: sibingzhang <74443791+sibingzh...@users.noreply.github.com>
AuthorDate: Tue Nov 1 15:51:55 2022 +0800

    KYLIN-5368 Fix when stop ke job on yarn are not be killed for yarn cluster 
mode
---
 .../rest/ISmartApplicationListenerForSystem.java   | 32 ++++++++++++
 .../apache/kylin/metadata/epoch/EpochManager.java  |  5 ++
 .../kylin/metadata/epoch/EpochManagerTest.java     | 22 ++++++++
 .../org/apache/kylin/rest/service/JobService.java  | 34 +++++++++++-
 .../apache/kylin/rest/service/JobServiceTest.java  | 61 ++++++++++++++++++++++
 .../org/apache/kylin/rest/BootstrapServer.java     |  9 +++-
 6 files changed, 160 insertions(+), 3 deletions(-)

diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/ISmartApplicationListenerForSystem.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/ISmartApplicationListenerForSystem.java
new file mode 100644
index 0000000000..8fc5f026e7
--- /dev/null
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/ISmartApplicationListenerForSystem.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.rest;
+
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.ApplicationEvent;
+import org.springframework.context.event.ContextClosedEvent;
+import org.springframework.context.event.SmartApplicationListener;
+
+public interface ISmartApplicationListenerForSystem extends 
SmartApplicationListener {
+
+    @Override
+    default boolean supportsEventType(Class<? extends ApplicationEvent> 
eventType) {
+        return eventType == ApplicationReadyEvent.class || eventType == 
ContextClosedEvent.class;
+    }
+
+}
\ No newline at end of file
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java
index bb7f2c3d2f..cbe1ba9590 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java
@@ -814,4 +814,9 @@ public class EpochManager {
             return null;
         });
     }
+
+    public List<Epoch> getOwnedEpochs() {
+        return 
epochStore.list().stream().filter(this::checkEpochOwnerOnly).collect(Collectors.toList());
+    }
+
 }
diff --git 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java
 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java
index 164d952020..d488dfe67b 100644
--- 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java
+++ 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java
@@ -214,6 +214,28 @@ class EpochManagerTest {
 
     }
 
+    @Test
+    void testGetOwnedEpochs() {
+        Epoch e1 = new Epoch();
+        e1.setEpochTarget("test1");
+        e1.setCurrentEpochOwner("owner1");
+
+        Epoch e2 = new Epoch();
+        e2.setEpochTarget("test2");
+        e2.setCurrentEpochOwner("owner2");
+
+        Epoch e3 = new Epoch();
+        e3.setEpochTarget("test3");
+        e3.setCurrentEpochOwner("owner2");
+
+        getEpochStore().insertBatch(Arrays.asList(e1, e2, e3));
+
+        EpochManager epochManager = EpochManager.getInstance();
+        epochManager.setIdentity("owner2");
+
+        Assertions.assertEquals(2, epochManager.getOwnedEpochs().size());
+    }
+
     @Test
     void testForceUpdateEpoch() {
         EpochManager epochManager = EpochManager.getInstance();
diff --git 
a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
 
b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
index 08ef663424..22424681c5 100644
--- 
a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ 
b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -47,6 +47,7 @@ import java.util.stream.Stream;
 
 import javax.servlet.http.HttpServletRequest;
 
+import org.apache.kylin.metadata.epoch.EpochManager;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.io.IOUtils;
@@ -66,6 +67,7 @@ import org.apache.kylin.common.metrics.MetricsGroup;
 import org.apache.kylin.common.metrics.MetricsName;
 import org.apache.kylin.common.msg.Message;
 import org.apache.kylin.common.msg.MsgPicker;
+import org.apache.kylin.common.persistence.metadata.Epoch;
 import org.apache.kylin.common.persistence.transaction.UnitOfWork;
 import org.apache.kylin.common.persistence.transaction.UnitOfWorkContext;
 import org.apache.kylin.common.scheduler.EventBusFactory;
@@ -107,6 +109,7 @@ import org.apache.kylin.metadata.model.Segments;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
 import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.rest.ISmartApplicationListenerForSystem;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.request.JobFilter;
 import org.apache.kylin.rest.request.JobUpdateRequest;
@@ -122,6 +125,8 @@ import org.apache.kylin.rest.util.SparkHistoryUIUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationEvent;
+import org.springframework.context.event.ContextClosedEvent;
 import org.springframework.http.HttpHeaders;
 import org.springframework.security.access.prepost.PreAuthorize;
 import org.springframework.stereotype.Component;
@@ -141,7 +146,7 @@ import lombok.val;
 import lombok.var;
 
 @Component("jobService")
-public class JobService extends BasicService implements JobSupporter {
+public class JobService extends BasicService implements JobSupporter, 
ISmartApplicationListenerForSystem {
 
     @Autowired
     private ProjectService projectService;
@@ -1352,6 +1357,33 @@ public class JobService extends BasicService implements 
JobSupporter {
             MsgPicker.setMsg("en");
         }
     }
+
+    @Override
+    public void onApplicationEvent(ApplicationEvent event) {
+        if (event instanceof ContextClosedEvent) {
+            logger.info("Stop kyligence node, kill job on yarn for yarn 
cluster mode");
+            EpochManager epochManager = EpochManager.getInstance();
+            KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+            List<Epoch> ownedEpochs = epochManager.getOwnedEpochs();
+
+            for (Epoch epoch : ownedEpochs) {
+                String project = epoch.getEpochTarget();
+                NExecutableManager executableManager = 
NExecutableManager.getInstance(kylinConfig, project);
+                if (executableManager != null) {
+                    List<ExecutablePO> allJobs = 
executableManager.getAllJobs();
+                    for (ExecutablePO executablePO : allJobs) {
+                        executableManager.cancelRemoteJob(executablePO);
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public int getOrder() {
+        return HIGHEST_PRECEDENCE;
+    }
+
     @Setter
     @Getter
     static class ExecutablePOSortBean {
diff --git 
a/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
 
b/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
index ee6db5a9be..71266afd4c 100644
--- 
a/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
+++ 
b/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java
@@ -57,11 +57,14 @@ import java.util.stream.Collectors;
 
 import javax.servlet.http.HttpServletRequest;
 
+import org.apache.kylin.metadata.epoch.EpochManager;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.exception.KylinException;
+import org.apache.kylin.common.persistence.metadata.Epoch;
+import org.apache.kylin.common.persistence.metadata.EpochStore;
 import org.apache.kylin.common.persistence.transaction.TransactionException;
 import org.apache.kylin.common.persistence.transaction.UnitOfWork;
 import org.apache.kylin.common.util.ClassUtil;
@@ -89,6 +92,7 @@ import org.apache.kylin.job.execution.JobTypeEnum;
 import org.apache.kylin.job.execution.NExecutableManager;
 import org.apache.kylin.job.execution.StageBase;
 import org.apache.kylin.job.execution.SucceedChainedTestExecutable;
+import org.apache.kylin.job.execution.SucceedDagTestExecutable;
 import org.apache.kylin.job.execution.SucceedTestExecutable;
 import org.apache.kylin.metadata.cube.model.NBatchConstants;
 import org.apache.kylin.metadata.cube.model.NDataflowManager;
@@ -125,7 +129,9 @@ import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.springframework.beans.factory.config.BeanDefinition;
+import org.springframework.context.ApplicationEvent;
 import 
org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
+import org.springframework.context.event.ContextClosedEvent;
 import org.springframework.core.type.filter.AssignableTypeFilter;
 import org.springframework.http.HttpHeaders;
 import org.springframework.security.authentication.TestingAuthenticationToken;
@@ -177,6 +183,9 @@ public class JobServiceTest extends 
NLocalFileMetadataTestCase {
     @Mock
     private ProjectService projectService = Mockito.spy(ProjectService.class);
 
+    @Mock
+    private ApplicationEvent applicationEvent = 
Mockito.mock(ContextClosedEvent.class);
+
     @Rule
     public ExpectedException thrown = ExpectedException.none();
 
@@ -1901,4 +1910,56 @@ public class JobServiceTest extends 
NLocalFileMetadataTestCase {
         Assert.assertEquals(0.0, executable.getProgress(), 0);
         Assert.assertEquals("", executable.getRelatedSegment());
     }
+
+    @Test
+    public void tstOnApplicationEvent() {
+        final String PROJECT1 = "test1";
+        final String PROJECT2 = "test2";
+        final String PROJECT3 = "test3";
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        NExecutableManager manager1 = 
NExecutableManager.getInstance(kylinConfig, PROJECT2);
+
+        Epoch e1 = new Epoch();
+        e1.setEpochTarget(PROJECT1);
+        e1.setCurrentEpochOwner("owner1");
+
+        Epoch e2 = new Epoch();
+        e2.setEpochTarget(PROJECT2);
+        e2.setCurrentEpochOwner("owner2");
+
+        Epoch e3 = new Epoch();
+        e3.setEpochTarget(PROJECT3);
+        e3.setCurrentEpochOwner("owner2");
+
+        try {
+            
EpochStore.getEpochStore(kylinConfig).insertBatch(Arrays.asList(e1, e2, e3));
+        } catch (Exception e) {
+            throw new RuntimeException("cannnot init epoch store!");
+        }
+
+        EpochManager epochManager = EpochManager.getInstance();
+        epochManager.setIdentity("owner2");
+
+        val job = new DefaultExecutable();
+        job.setProject(PROJECT2);
+        job.setJobType(JobTypeEnum.INDEX_BUILD);
+
+        val executable1 = new SucceedDagTestExecutable();
+        executable1.setProject(PROJECT2);
+        job.addTask(executable1);
+
+        val executable2 = new SucceedDagTestExecutable();
+        executable2.setProject(PROJECT2);
+        job.addTask(executable2);
+
+        executable1.setNextSteps(Sets.newHashSet(executable2.getId()));
+        executable2.setPreviousStep(executable1.getId());
+
+        val executablePO = NExecutableManager.toPO(job, PROJECT2);
+        manager1.addJob(executablePO);
+        manager1.updateJobOutput(job.getId(), ExecutableState.RUNNING);
+
+        jobService.onApplicationEvent(applicationEvent);
+    }
+
 }
diff --git 
a/src/server/src/main/java/org/apache/kylin/rest/BootstrapServer.java 
b/src/server/src/main/java/org/apache/kylin/rest/BootstrapServer.java
index c18973ed27..474ccad787 100644
--- a/src/server/src/main/java/org/apache/kylin/rest/BootstrapServer.java
+++ b/src/server/src/main/java/org/apache/kylin/rest/BootstrapServer.java
@@ -38,7 +38,6 @@ import 
org.springframework.cloud.client.discovery.EnableDiscoveryClient;
 import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClient;
 import org.springframework.cloud.zookeeper.discovery.ZookeeperInstance;
 import org.springframework.context.ApplicationEvent;
-import org.springframework.context.ApplicationListener;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.ImportResource;
 import org.springframework.context.event.ContextClosedEvent;
@@ -60,7 +59,7 @@ import lombok.val;
 @EnableDiscoveryClient
 @LoadBalancerClient(name = "spring-boot-provider", configuration = 
org.apache.kylin.rest.LoadBalanced.class)
 @EnableSpringHttpSession
-public class BootstrapServer implements ApplicationListener<ApplicationEvent> {
+public class BootstrapServer implements ISmartApplicationListenerForSystem {
 
     private static final Logger logger = 
LoggerFactory.getLogger(BootstrapServer.class);
 
@@ -135,4 +134,10 @@ public class BootstrapServer implements 
ApplicationListener<ApplicationEvent> {
             EpochManager.getInstance().releaseOwnedEpochs();
         }
     }
+
+    @Override
+    public int getOrder() {
+        return LOWEST_PRECEDENCE;
+    }
+
 }

Reply via email to