This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit dea257ad3b1140d8f9b1b4c706739428ff6fae60 Author: Jiale He <jiale...@kyligence.io> AuthorDate: Thu Nov 3 20:14:04 2022 +0800 KYLIN-5344 Fix epoch update when epoch checker disabled --- .../org/apache/kylin/metadata/epoch/EpochOrchestrator.java | 8 +++++--- .../apache/kylin/metadata/epoch/EpochOrchestratorTest.java | 13 +++++++++++-- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochOrchestrator.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochOrchestrator.java index 26c36824ae..631e2f5b33 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochOrchestrator.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochOrchestrator.java @@ -70,13 +70,15 @@ public class EpochOrchestrator { private void startEpochChecker(KylinConfig kylinConfig) { // first renew and update epoch at org.apache.kylin.rest.discovery.KylinServiceDiscoveryCache#createServiceCache + long pollSecond = kylinConfig.getEpochCheckerIntervalSecond(); + logger.info("Try to update/renew epoch every {} seconds", pollSecond); if (!kylinConfig.getEpochCheckerEnabled()) { // this logic can be used when there is only one All or Job KE node - logger.info("Disable epoch timing renew and update, renew and update epoch only once"); + logger.info("Disable epoch timing renew, renew epoch only once"); + checkerPool = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("EpochChecker")); + checkerPool.scheduleWithFixedDelay(new EpochChecker(), 1, pollSecond, TimeUnit.SECONDS); return; } - long pollSecond = kylinConfig.getEpochCheckerIntervalSecond(); - logger.info("Try to update epoch every {} seconds", pollSecond); logger.info("Renew executor work size is :{}", kylinConfig.getRenewEpochWorkerPoolSize()); checkerPool = Executors.newScheduledThreadPool(2, new NamedThreadFactory("EpochChecker")); checkerPool.scheduleWithFixedDelay(new EpochChecker(), 1, pollSecond, TimeUnit.SECONDS); diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochOrchestratorTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochOrchestratorTest.java index af47f373e5..dd510cb84c 100644 --- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochOrchestratorTest.java +++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochOrchestratorTest.java @@ -62,7 +62,16 @@ class EpochOrchestratorTest { val epochOrchestrator = new EpochOrchestrator(config); val obj = ReflectionTestUtils.getField(epochOrchestrator, "checkerPool"); - Assertions.assertTrue(Objects.isNull(obj)); - } + Assertions.assertTrue(Objects.nonNull(obj)); + Assertions.assertTrue(obj instanceof ScheduledExecutorService); + val pool = (ScheduledExecutorService) obj; + Object obj2 = ReflectionTestUtils.getField(pool, "e"); + Assertions.assertNotNull(obj2); + Assertions.assertTrue(obj2 instanceof ScheduledExecutorService); + ScheduledExecutorService executors = (ScheduledExecutorService) obj2; + Object obj3 = ReflectionTestUtils.getField(executors, "corePoolSize"); + Assertions.assertNotNull(obj3); + Assertions.assertEquals(1, ((Integer) obj3).intValue()); + } }