This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin5 by this push:
     new 0408501504 KYLIN-5308 add transaction timeout when epoch renew
0408501504 is described below

commit 0408501504b954f731017a48b2ef76f41e9b9f8e
Author: Jiale He <jiale...@kyligence.io>
AuthorDate: Thu Oct 13 09:53:40 2022 +0800

    KYLIN-5308 add transaction timeout when epoch renew
    
    Co-authored-by: Jiale He <jialeheb...@gmail.com>
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   4 +
 .../common/persistence/metadata/EpochStore.java    |   5 +-
 .../persistence/metadata/FileEpochStore.java       |   5 +
 .../persistence/metadata/JdbcAuditLogStore.java    |   2 +-
 .../persistence/metadata/JdbcEpochStore.java       |   6 +
 .../common/persistence/metadata/jdbc/JdbcUtil.java |  12 +-
 .../apache/kylin/common/KylinConfigBaseTest.java   |  12 ++
 .../epochstore/AbstractEpochStoreTest.java         | 132 +++++++--------------
 .../metadata/epochstore/JdbcEpochStoreTest.java    |  65 +++++++---
 .../apache/kylin/metadata/epoch/EpochManager.java  |  17 ++-
 .../kylin/metadata/epoch/EpochManagerTest.java     |  57 ++++++++-
 11 files changed, 197 insertions(+), 120 deletions(-)

diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 3b94b3b024..8498eba0e6 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2738,6 +2738,10 @@ public abstract class KylinConfigBase implements 
Serializable {
         return 
Long.parseLong(getOptional("kylin.server.leader-race.heart-beat-interval", 
"30"));
     }
 
+    public double getEpochRenewTimeoutRate() {
+        return 
Double.parseDouble(getOptional("kylin.server.leader-race.heart-beat-timeout-rate",
 "0.8"));
+    }
+
     public long getDiscoveryClientTimeoutThreshold() {
         return 
TimeUtil.timeStringAs(getOptional("kylin.server.discovery-client-timeout-threshold",
 "3s"),
                 TimeUnit.SECONDS);
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/EpochStore.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/EpochStore.java
index dde480d2bc..388485b789 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/EpochStore.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/EpochStore.java
@@ -20,11 +20,12 @@ package org.apache.kylin.common.persistence.metadata;
 import java.util.List;
 import java.util.Objects;
 
-import lombok.extern.slf4j.Slf4j;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.Singletons;
 import org.apache.kylin.common.util.AddressUtil;
 
+import lombok.extern.slf4j.Slf4j;
+
 @Slf4j
 public abstract class EpochStore {
     public static final String EPOCH_SUFFIX = "_epoch";
@@ -54,6 +55,8 @@ public abstract class EpochStore {
 
     public abstract <T> T executeWithTransaction(Callback<T> callback);
 
+    public abstract <T> T executeWithTransaction(Callback<T> callback, int 
timeout);
+
     public Epoch getGlobalEpoch() {
         return getEpoch("_global");
     }
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/FileEpochStore.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/FileEpochStore.java
index b18f4c5be4..b90a53b35e 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/FileEpochStore.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/FileEpochStore.java
@@ -157,4 +157,9 @@ public class FileEpochStore extends EpochStore {
 
         return null;
     }
+
+    @Override
+    public <T> T executeWithTransaction(Callback<T> callback, int timeout) {
+        return executeWithTransaction(callback);
+    }
 }
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcAuditLogStore.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcAuditLogStore.java
index ec2081bd14..80af263876 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcAuditLogStore.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcAuditLogStore.java
@@ -200,7 +200,7 @@ public class JdbcAuditLogStore implements AuditLogStore {
                             }
                             return null;
                         
}).filter(Objects::nonNull).collect(Collectors.toList())),
-                TransactionDefinition.ISOLATION_REPEATABLE_READ, beforeCommit);
+                TransactionDefinition.ISOLATION_REPEATABLE_READ, beforeCommit, 
TransactionDefinition.TIMEOUT_DEFAULT);
     }
 
     public void batchInsert(List<AuditLog> auditLogs) {
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcEpochStore.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcEpochStore.java
index be32feb19e..bb444822ac 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcEpochStore.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcEpochStore.java
@@ -21,6 +21,7 @@ import static 
org.apache.kylin.common.exception.CommonErrorCode.FAILED_UPDATE_ME
 import static 
org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil.datasourceParameters;
 import static 
org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil.isTableExists;
 import static 
org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil.withTransaction;
+import static 
org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil.withTransactionTimeout;
 
 import java.io.InputStream;
 import java.sql.PreparedStatement;
@@ -266,4 +267,9 @@ public class JdbcEpochStore extends EpochStore {
     public <T> T executeWithTransaction(Callback<T> callback) {
         return withTransaction(transactionManager, callback::handle);
     }
+
+    @Override
+    public <T> T executeWithTransaction(Callback<T> callback, int timeout) {
+        return withTransactionTimeout(transactionManager, callback::handle, 
timeout);
+    }
 }
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/jdbc/JdbcUtil.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/jdbc/JdbcUtil.java
index 7d74e8851d..94734b415e 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/jdbc/JdbcUtil.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/jdbc/JdbcUtil.java
@@ -53,19 +53,27 @@ public class JdbcUtil {
 
     private static final Logger logger = 
LoggerFactory.getLogger(JdbcUtil.class);
 
+    public static <T> T withTransactionTimeout(DataSourceTransactionManager 
transactionManager, Callback<T> consumer,
+            int timeout) {
+        return withTransaction(transactionManager, consumer, 
TransactionDefinition.ISOLATION_REPEATABLE_READ, null,
+                timeout);
+    }
+
     public static <T> T withTransaction(DataSourceTransactionManager 
transactionManager, Callback<T> consumer) {
         return withTransaction(transactionManager, consumer, 
TransactionDefinition.ISOLATION_REPEATABLE_READ);
     }
 
     public static <T> T withTransaction(DataSourceTransactionManager 
transactionManager, Callback<T> consumer,
             int isolationLevel) {
-        return withTransaction(transactionManager, consumer, isolationLevel, 
null);
+        return withTransaction(transactionManager, consumer, isolationLevel, 
null,
+                TransactionDefinition.TIMEOUT_DEFAULT);
     }
 
     public static <T> T withTransaction(DataSourceTransactionManager 
transactionManager, Callback<T> consumer,
-            int isolationLevel, Callback<T> beforeCommit) {
+            int isolationLevel, Callback<T> beforeCommit, int timeout) {
         val definition = new DefaultTransactionDefinition();
         definition.setIsolationLevel(isolationLevel);
+        definition.setTimeout(timeout);
         val status = transactionManager.getTransaction(definition);
         try {
             T result = consumer.handle();
diff --git 
a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
 
b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
index f927bd4e80..903fb8f6a0 100644
--- 
a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
+++ 
b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
@@ -1359,6 +1359,18 @@ class KylinConfigBaseTest {
         // Reset to prevent impacting other tests
         config.setProperty(WRITING_CLUSTER_WORKING_DIR, "");
     }
+
+    @Test
+    void testGetEpochRenewTimeoutRate() {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        Assertions.assertEquals(0.8, config.getEpochRenewTimeoutRate());
+        config.setProperty("kylin.server.leader-race.heart-beat-timeout-rate", 
"0.0");
+        Assertions.assertEquals(0.0, config.getEpochRenewTimeoutRate());
+        config.setProperty("kylin.server.leader-race.heart-beat-timeout-rate", 
"0");
+        Assertions.assertEquals(0.0, config.getEpochRenewTimeoutRate());
+        config.setProperty("kylin.server.leader-race.heart-beat-timeout-rate", 
"1");
+        Assertions.assertEquals(1.0, config.getEpochRenewTimeoutRate());
+    }
 }
 
 class EnvironmentUpdateUtils {
diff --git 
a/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/epochstore/AbstractEpochStoreTest.java
 
b/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/epochstore/AbstractEpochStoreTest.java
index 9cc3f9e84e..02f3b05363 100644
--- 
a/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/epochstore/AbstractEpochStoreTest.java
+++ 
b/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/epochstore/AbstractEpochStoreTest.java
@@ -27,10 +27,11 @@ import 
org.apache.kylin.common.persistence.metadata.EpochStore;
 import org.apache.kylin.common.persistence.metadata.JdbcEpochStore;
 import org.apache.kylin.common.util.AddressUtil;
 import org.apache.kylin.junit.annotation.MetadataInfo;
-import org.junit.Assert;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import com.google.common.collect.Lists;
+
 import lombok.val;
 
 @MetadataInfo(onlyProps = true)
@@ -56,152 +57,107 @@ public abstract class AbstractEpochStoreTest {
     @Test
     public void testInsertAndUpdate() {
 
-        Epoch newSaveEpoch = new Epoch();
-        newSaveEpoch.setEpochTarget("test1");
-        newSaveEpoch.setCurrentEpochOwner("owner1");
-        newSaveEpoch.setEpochId(1);
-        newSaveEpoch.setLastEpochRenewTime(System.currentTimeMillis());
-
+        Epoch mockEpoch = getMockEpoch("test1", "owner1");
         //insert one
-        epochStore.insert(newSaveEpoch);
-        val epochs = epochStore.list();
-        Assert.assertEquals(epochs.size(), 1);
+        epochStore.insert(mockEpoch);
 
-        Assert.assertTrue(compareEpoch(newSaveEpoch, epochs.get(0)));
+        val epochs = epochStore.list();
+        Assertions.assertEquals(1, epochs.size());
+        Assertions.assertTrue(compareEpoch(mockEpoch, epochs.get(0)));
 
         //update owner
-        newSaveEpoch.setCurrentEpochOwner("o2");
-        epochStore.update(newSaveEpoch);
-
-        Assert.assertEquals(newSaveEpoch.getCurrentEpochOwner(), 
epochStore.list().get(0).getCurrentEpochOwner());
+        mockEpoch.setCurrentEpochOwner("o2");
+        epochStore.update(mockEpoch);
 
+        Assertions.assertEquals(mockEpoch.getCurrentEpochOwner(), 
epochStore.list().get(0).getCurrentEpochOwner());
     }
 
     @Test
     public void testExecuteWithTransaction_Success() {
 
-        Epoch e1 = new Epoch();
-        e1.setEpochTarget("test1");
-        e1.setCurrentEpochOwner("owner1");
-        e1.setEpochId(1);
-        e1.setLastEpochRenewTime(System.currentTimeMillis());
-
+        Epoch mockEpoch = getMockEpoch("test1", "owner1");
         epochStore.executeWithTransaction(() -> {
-            epochStore.insert(e1);
-
+            epochStore.insert(mockEpoch);
             //insert success
-            Assert.assertEquals(epochStore.list().size(), 1);
-            Assert.assertTrue(compareEpoch(e1, epochStore.list().get(0)));
-
+            Assertions.assertEquals(1, epochStore.list().size());
+            Assertions.assertTrue(compareEpoch(mockEpoch, 
epochStore.list().get(0)));
             return null;
         });
-
     }
 
     @Test
     public void testBatchUpdate() {
-        Epoch e1 = new Epoch();
-        e1.setEpochTarget("test1");
-        e1.setCurrentEpochOwner("owner1");
-        e1.setEpochId(1);
-        e1.setLastEpochRenewTime(System.currentTimeMillis());
 
-        epochStore.insert(e1);
+        Epoch e1 = getMockEpoch("test1", "owner1");
+        Epoch e2 = getMockEpoch("test2", "owner2");
 
-        Epoch e2 = new Epoch();
-        e2.setEpochTarget("test2");
-        e2.setCurrentEpochOwner("owner2");
-        e2.setEpochId(1);
-        e2.setLastEpochRenewTime(System.currentTimeMillis());
+        epochStore.insert(e1);
         epochStore.insert(e2);
 
-        val batchEpochs = Arrays.asList(e1, e2);
-
+        val batchEpochs = Lists.newArrayList(e1, e2);
         epochStore.updateBatch(batchEpochs);
-
-        batchEpochs.forEach(epoch -> {
-            Assert.assertTrue(compareEpoch(epoch, 
epochStore.getEpoch(epoch.getEpochTarget())));
-        });
-
+        batchEpochs.forEach(
+                epoch -> Assertions.assertTrue(compareEpoch(epoch, 
epochStore.getEpoch(epoch.getEpochTarget()))));
     }
 
     @Test
     public void testBatchUpdateWithError() {
-        Epoch e1 = new Epoch();
-        e1.setEpochTarget("test1");
-        e1.setCurrentEpochOwner("owner1");
-        e1.setEpochId(1);
-        e1.setLastEpochRenewTime(System.currentTimeMillis());
 
-        epochStore.insert(e1);
+        Epoch e1 = getMockEpoch("test1", "owner1");
+        Epoch e2 = getMockEpoch("test2", "owner2");
 
-        Epoch e2 = new Epoch();
-        e2.setEpochTarget("test2");
-        e2.setCurrentEpochOwner("owner2");
-        e2.setEpochId(1);
-        e2.setLastEpochRenewTime(System.currentTimeMillis());
+        epochStore.insert(e1);
 
-        val batchEpochs = Arrays.asList(e1, e2);
         boolean isError = false;
         try {
-            epochStore.updateBatch(batchEpochs);
+            epochStore.updateBatch(Lists.newArrayList(e1, e2));
         } catch (Exception e) {
             isError = true;
         }
         if (epochStore instanceof JdbcEpochStore) {
-            Assert.assertTrue(isError);
+            Assertions.assertTrue(isError);
         }
     }
 
     @Test
     public void testBatchInsert() {
-        Epoch e1 = new Epoch();
-        e1.setEpochTarget("test1");
-        e1.setCurrentEpochOwner("owner1");
-        e1.setEpochId(1);
-        e1.setLastEpochRenewTime(System.currentTimeMillis());
-
-        Epoch e2 = new Epoch();
-        e2.setEpochTarget("test2");
-        e2.setCurrentEpochOwner("owner2");
-        e2.setEpochId(1);
-        e2.setLastEpochRenewTime(System.currentTimeMillis());
+        Epoch e1 = getMockEpoch("test1", "owner1");
+        Epoch e2 = getMockEpoch("test2", "owner2");
 
         val batchEpochs = Arrays.asList(e1, e2);
-
         epochStore.insertBatch(batchEpochs);
-
-        batchEpochs.forEach(epoch -> {
-            Assert.assertTrue(compareEpoch(epoch, 
epochStore.getEpoch(epoch.getEpochTarget())));
-        });
-
+        batchEpochs.forEach(
+                epoch -> Assertions.assertTrue(compareEpoch(epoch, 
epochStore.getEpoch(epoch.getEpochTarget()))));
     }
 
     @Test
     public void testIsLeaderNodeWithCurrentEpochOwnerNull() {
-        Epoch epoch = new Epoch();
-        epoch.setEpochTarget("_global");
-        epoch.setCurrentEpochOwner(null);
-        epochStore.insert(epoch);
+        Epoch mockEpoch = getMockEpoch("_global", null);
+        epochStore.insert(mockEpoch);
         Assertions.assertFalse(EpochStore.isLeaderNode());
     }
 
     @Test
     public void testIsLeaderNodeWithServiceInfoNotEqual() {
-        Epoch epoch = new Epoch();
-        epoch.setEpochTarget("_global");
-        epoch.setCurrentEpochOwner("owner1");
-        epochStore.insert(epoch);
+        Epoch mockEpoch = getMockEpoch("_global", "owner1");
+        epochStore.insert(mockEpoch);
         Assertions.assertFalse(EpochStore.isLeaderNode());
     }
 
     @Test
     public void testIsLeaderNodeWithServiceInfoEqual() {
         Assertions.assertFalse(EpochStore.isLeaderNode());
-        Epoch epoch = new Epoch();
-        epoch.setEpochTarget("_global");
-        epoch.setCurrentEpochOwner(AddressUtil.getLocalInstance() + "|" + 
Long.MAX_VALUE);
-        epochStore.insert(epoch);
+        Epoch mockEpoch = getMockEpoch("_global", 
AddressUtil.getLocalInstance() + "|" + Long.MAX_VALUE);
+        epochStore.insert(mockEpoch);
         Assertions.assertTrue(EpochStore.isLeaderNode());
     }
+
+    protected Epoch getMockEpoch(String epochTarget, String epochOwner) {
+        Epoch epoch = new Epoch();
+        epoch.setEpochTarget(epochTarget);
+        epoch.setCurrentEpochOwner(epochOwner);
+        epoch.setEpochId(1);
+        epoch.setLastEpochRenewTime(System.currentTimeMillis());
+        return epoch;
+    }
 }
diff --git 
a/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/epochstore/JdbcEpochStoreTest.java
 
b/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/epochstore/JdbcEpochStoreTest.java
index 06e1074264..d51c15ec66 100644
--- 
a/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/epochstore/JdbcEpochStoreTest.java
+++ 
b/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/epochstore/JdbcEpochStoreTest.java
@@ -19,17 +19,22 @@ package 
org.apache.kylin.common.persistence.metadata.epochstore;
 
 import static 
org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil.datasourceParameters;
 import static org.apache.kylin.common.util.TestUtils.getTestConfig;
+import static org.awaitility.Awaitility.await;
+
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.dbcp2.BasicDataSourceFactory;
 import org.apache.kylin.common.persistence.metadata.Epoch;
 import org.apache.kylin.junit.annotation.OverwriteProp;
-import org.junit.Assert;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.transaction.TransactionException;
 
 import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
 
 import lombok.val;
 
@@ -55,34 +60,62 @@ public final class JdbcEpochStoreTest extends 
AbstractEpochStoreTest {
     }
 
     @Test
-    public void testExecuteWithTransaction_RollBack() {
-
-        Epoch e1 = new Epoch();
-        e1.setEpochTarget("test1");
-        e1.setCurrentEpochOwner("owner1");
-        e1.setEpochId(1);
-        e1.setLastEpochRenewTime(System.currentTimeMillis());
+    void testExecuteWithTransaction_RollBack() {
 
+        Epoch mockEpoch = getMockEpoch("test1", "owner1");
         try {
             epochStore.executeWithTransaction(() -> {
-                epochStore.insert(e1);
-
                 //insert success
-                Assert.assertEquals(epochStore.list().size(), 1);
-                Assert.assertTrue(compareEpoch(e1, epochStore.list().get(0)));
+                epochStore.insert(mockEpoch);
+                Assertions.assertEquals(1, epochStore.list().size());
+                Assertions.assertTrue(compareEpoch(mockEpoch, 
epochStore.list().get(0)));
 
                 if (epochStore.list().size() == 1) {
                     throw new RuntimeException("mock transaction error");
                 }
-
                 return null;
             });
+            Assertions.fail();
+        } catch (RuntimeException e) {
+            Assertions.assertEquals("mock transaction error", 
Throwables.getRootCause(e).getMessage());
+            Assertions.assertEquals(0, epochStore.list().size());
+        }
+    }
 
-            Assert.fail();
+    @Test
+    void testExecuteWithTransactionTimeout_RollBack() {
+
+        Epoch mockEpoch = getMockEpoch("test1", "owner1");
+        // before transaction
+        Assertions.assertEquals(0, epochStore.list().size());
+        try {
+            epochStore.executeWithTransaction(() -> {
+                // mock transaction timeout
+                await().pollDelay(1100, TimeUnit.MILLISECONDS).until(() -> 
true);
+                epochStore.insertBatch(Lists.newArrayList(mockEpoch));
+                return null;
+            }, 1);
+            Assertions.fail();
         } catch (RuntimeException e) {
-            Assert.assertEquals(Throwables.getRootCause(e).getMessage(), "mock 
transaction error");
-            Assert.assertEquals(epochStore.list().size(), 0);
+            Throwable rootCause = Throwables.getRootCause(e);
+            Assertions.assertTrue(rootCause instanceof TransactionException);
+            Assertions.assertTrue(rootCause.getMessage().contains("Transaction 
timed out"));
         }
+        // rollback result
+        Assertions.assertEquals(0, epochStore.list().size());
+    }
 
+    @Test
+    void testExecuteWithTransactionTimeoutSuccess() {
+
+        Epoch mockEpoch = getMockEpoch("test1", "owner1");
+        Assertions.assertEquals(0, epochStore.list().size());
+        epochStore.executeWithTransaction(() -> {
+            await().pollDelay(1100, TimeUnit.MILLISECONDS).until(() -> true);
+            epochStore.insertBatch(Lists.newArrayList(mockEpoch));
+            Assertions.assertEquals(1, epochStore.list().size());
+            return null;
+        });
+        Assertions.assertEquals(1, epochStore.list().size());
     }
 }
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java
index 1dff56cbcf..bb7f2c3d2f 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java
@@ -19,7 +19,6 @@
 package org.apache.kylin.metadata.epoch;
 
 import static org.apache.kylin.common.util.AddressUtil.MAINTAIN_MODE_MOCK_PORT;
-import static 
org.apache.kylin.metadata.epoch.EpochUpdateLockManager.executeEpochWithLock;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -46,9 +45,6 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.Singletons;
-import org.apache.kylin.common.util.NamedThreadFactory;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.common.persistence.metadata.Epoch;
 import org.apache.kylin.common.persistence.metadata.EpochStore;
 import org.apache.kylin.common.persistence.transaction.UnitOfWork;
@@ -58,7 +54,10 @@ import 
org.apache.kylin.common.scheduler.ProjectControlledNotifier;
 import org.apache.kylin.common.scheduler.ProjectEscapedNotifier;
 import org.apache.kylin.common.scheduler.SourceUsageVerifyNotifier;
 import org.apache.kylin.common.util.AddressUtil;
+import org.apache.kylin.common.util.NamedThreadFactory;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.metadata.project.NProjectManager;
+import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.resourcegroup.ResourceGroupManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -97,6 +96,7 @@ public class EpochManager {
     private final String serverMode;
     private final boolean epochCheckEnabled;
     private final long epochExpiredTime;
+    private final int epochRenewTimeout;
 
     @Getter
     private final EpochUpdateManager epochUpdateManager;
@@ -113,8 +113,13 @@ public class EpochManager {
         this.serverMode = config.getServerMode();
         this.epochCheckEnabled = config.getEpochCheckerEnabled();
         this.epochExpiredTime = config.getEpochExpireTimeSecond();
+        this.epochRenewTimeout = getEpochRenewTimeout();
         this.epochUpdateManager = new EpochUpdateManager();
+    }
 
+    private int getEpochRenewTimeout() {
+        double timeoutRate = config.getEpochRenewTimeoutRate() <= 0 ? 1 : 
config.getEpochRenewTimeoutRate();
+        return (int) (epochExpiredTime * timeoutRate);
     }
 
     public class EpochUpdateManager {
@@ -260,7 +265,7 @@ public class EpochManager {
             });
 
             try {
-                if (!countDownLatch.await(epochExpiredTime, TimeUnit.SECONDS)) 
{
+                if (!countDownLatch.await(epochRenewTimeout, 
TimeUnit.SECONDS)) {
                     logger.error("renew not finished,{}/{}...", 
newRenewEpochSets.size(), oriEpochs.size());
                 }
             } catch (InterruptedException e) {
@@ -449,7 +454,7 @@ public class EpochManager {
                 epochStore.updateBatch(needUpdateEpochList);
             }
             return null;
-        });
+        }, epochRenewTimeout);
     }
 
     /**
diff --git 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java
 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java
index 8e8d0ac2f3..164d952020 100644
--- 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java
+++ 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java
@@ -227,9 +227,7 @@ class EpochManagerTest {
     void testUpdateProjectEpochWithResourceGroupEnabled() {
         val manager = ResourceGroupManager.getInstance(getTestConfig());
         manager.getResourceGroup();
-        manager.updateResourceGroup(copyForWrite -> {
-            copyForWrite.setResourceGroupEnabled(true);
-        });
+        manager.updateResourceGroup(copyForWrite -> 
copyForWrite.setResourceGroupEnabled(true));
         EpochManager epochManager = EpochManager.getInstance();
         val prjMgr = NProjectManager.getInstance(getTestConfig());
         for (ProjectInstance prj : prjMgr.listAllProjects()) {
@@ -244,9 +242,7 @@ class EpochManagerTest {
     @Test
     void testGetEpochOwnerWithException() {
         EpochManager epochManager = EpochManager.getInstance();
-        Assertions.assertThrows(IllegalStateException.class, () -> {
-            epochManager.getEpochOwner(null);
-        });
+        Assertions.assertThrows(IllegalStateException.class, () -> 
epochManager.getEpochOwner(null));
     }
 
     @Test
@@ -392,6 +388,55 @@ class EpochManagerTest {
 
     }
 
+    @Test
+    void testEpochRenewTimeoutDefault() {
+        KylinConfig config = getTestConfig();
+        double epochRenewTimeoutRate = config.getEpochRenewTimeoutRate();
+        Assertions.assertEquals(0.8, epochRenewTimeoutRate);
+        EpochManager manager = EpochManager.getInstance();
+        Object epochExpiredTime = ReflectionTestUtils.getField(manager, 
"epochExpiredTime");
+        Assertions.assertNotNull(epochExpiredTime);
+        Assertions.assertEquals(60, (long) epochExpiredTime);
+
+        Object epochRenewTimeout = ReflectionTestUtils.getField(manager, 
"epochRenewTimeout");
+        Assertions.assertNotNull(epochRenewTimeout);
+        Assertions.assertEquals(60 * epochRenewTimeoutRate, (int) 
epochRenewTimeout);
+    }
+
+    @Test
+    @OverwriteProp(key = "kylin.server.leader-race.heart-beat-timeout-rate", 
value = "0.0")
+    void testEpochRenewTimeoutOverride1() {
+        KylinConfig config = getTestConfig();
+        double epochRenewTimeoutRate = config.getEpochRenewTimeoutRate();
+        Assertions.assertEquals(0.0, epochRenewTimeoutRate);
+
+        EpochManager manager = EpochManager.getInstance();
+        Object epochExpiredTime = ReflectionTestUtils.getField(manager, 
"epochExpiredTime");
+        Assertions.assertNotNull(epochExpiredTime);
+        Assertions.assertEquals(60, (long) epochExpiredTime);
+
+        Object epochRenewTimeout = ReflectionTestUtils.getField(manager, 
"epochRenewTimeout");
+        Assertions.assertNotNull(epochRenewTimeout);
+        Assertions.assertEquals(60, (int) epochRenewTimeout);
+    }
+
+    @Test
+    @OverwriteProp(key = "kylin.server.leader-race.heart-beat-timeout-rate", 
value = "1.5")
+    void testEpochRenewTimeoutOverride2() {
+        KylinConfig config = getTestConfig();
+        double epochRenewTimeoutRate = config.getEpochRenewTimeoutRate();
+        Assertions.assertEquals(1.5, epochRenewTimeoutRate);
+
+        EpochManager manager = EpochManager.getInstance();
+        Object epochExpiredTime = ReflectionTestUtils.getField(manager, 
"epochExpiredTime");
+        Assertions.assertNotNull(epochExpiredTime);
+        Assertions.assertEquals(60, (long) epochExpiredTime);
+
+        Object epochRenewTimeout = ReflectionTestUtils.getField(manager, 
"epochRenewTimeout");
+        Assertions.assertNotNull(epochRenewTimeout);
+        Assertions.assertEquals(60 * epochRenewTimeoutRate, (int) 
epochRenewTimeout);
+    }
+
     EpochStore getEpochStore() {
         try {
             return EpochStore.getEpochStore(getTestConfig());

Reply via email to