This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit dea257ad3b1140d8f9b1b4c706739428ff6fae60
Author: Jiale He <jiale...@kyligence.io>
AuthorDate: Thu Nov 3 20:14:04 2022 +0800

    KYLIN-5344 Fix epoch update when epoch checker disabled
---
 .../org/apache/kylin/metadata/epoch/EpochOrchestrator.java  |  8 +++++---
 .../apache/kylin/metadata/epoch/EpochOrchestratorTest.java  | 13 +++++++++++--
 2 files changed, 16 insertions(+), 5 deletions(-)

diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochOrchestrator.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochOrchestrator.java
index 26c36824ae..631e2f5b33 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochOrchestrator.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochOrchestrator.java
@@ -70,13 +70,15 @@ public class EpochOrchestrator {
 
     private void startEpochChecker(KylinConfig kylinConfig) {
         // first renew and update epoch at 
org.apache.kylin.rest.discovery.KylinServiceDiscoveryCache#createServiceCache
+        long pollSecond = kylinConfig.getEpochCheckerIntervalSecond();
+        logger.info("Try to update/renew epoch every {} seconds", pollSecond);
         if (!kylinConfig.getEpochCheckerEnabled()) {
             // this logic can be used when there is only one All or Job KE node
-            logger.info("Disable epoch timing renew and update, renew and 
update epoch only once");
+            logger.info("Disable epoch timing renew, renew epoch only once");
+            checkerPool = Executors.newSingleThreadScheduledExecutor(new 
NamedThreadFactory("EpochChecker"));
+            checkerPool.scheduleWithFixedDelay(new EpochChecker(), 1, 
pollSecond, TimeUnit.SECONDS);
             return;
         }
-        long pollSecond = kylinConfig.getEpochCheckerIntervalSecond();
-        logger.info("Try to update epoch every {} seconds", pollSecond);
         logger.info("Renew executor work size is :{}", 
kylinConfig.getRenewEpochWorkerPoolSize());
         checkerPool = Executors.newScheduledThreadPool(2, new 
NamedThreadFactory("EpochChecker"));
         checkerPool.scheduleWithFixedDelay(new EpochChecker(), 1, pollSecond, 
TimeUnit.SECONDS);
diff --git 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochOrchestratorTest.java
 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochOrchestratorTest.java
index af47f373e5..dd510cb84c 100644
--- 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochOrchestratorTest.java
+++ 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochOrchestratorTest.java
@@ -62,7 +62,16 @@ class EpochOrchestratorTest {
 
         val epochOrchestrator = new EpochOrchestrator(config);
         val obj = ReflectionTestUtils.getField(epochOrchestrator, 
"checkerPool");
-        Assertions.assertTrue(Objects.isNull(obj));
-    }
+        Assertions.assertTrue(Objects.nonNull(obj));
+        Assertions.assertTrue(obj instanceof ScheduledExecutorService);
 
+        val pool = (ScheduledExecutorService) obj;
+        Object obj2 = ReflectionTestUtils.getField(pool, "e");
+        Assertions.assertNotNull(obj2);
+        Assertions.assertTrue(obj2 instanceof ScheduledExecutorService);
+        ScheduledExecutorService executors = (ScheduledExecutorService) obj2;
+        Object obj3 = ReflectionTestUtils.getField(executors, "corePoolSize");
+        Assertions.assertNotNull(obj3);
+        Assertions.assertEquals(1, ((Integer) obj3).intValue());
+    }
 }

Reply via email to