This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push: new 0408501504 KYLIN-5308 add transaction timeout when epoch renew 0408501504 is described below commit 0408501504b954f731017a48b2ef76f41e9b9f8e Author: Jiale He <jiale...@kyligence.io> AuthorDate: Thu Oct 13 09:53:40 2022 +0800 KYLIN-5308 add transaction timeout when epoch renew Co-authored-by: Jiale He <jialeheb...@gmail.com> --- .../org/apache/kylin/common/KylinConfigBase.java | 4 + .../common/persistence/metadata/EpochStore.java | 5 +- .../persistence/metadata/FileEpochStore.java | 5 + .../persistence/metadata/JdbcAuditLogStore.java | 2 +- .../persistence/metadata/JdbcEpochStore.java | 6 + .../common/persistence/metadata/jdbc/JdbcUtil.java | 12 +- .../apache/kylin/common/KylinConfigBaseTest.java | 12 ++ .../epochstore/AbstractEpochStoreTest.java | 132 +++++++-------------- .../metadata/epochstore/JdbcEpochStoreTest.java | 65 +++++++--- .../apache/kylin/metadata/epoch/EpochManager.java | 17 ++- .../kylin/metadata/epoch/EpochManagerTest.java | 57 ++++++++- 11 files changed, 197 insertions(+), 120 deletions(-) diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 3b94b3b024..8498eba0e6 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -2738,6 +2738,10 @@ public abstract class KylinConfigBase implements Serializable { return Long.parseLong(getOptional("kylin.server.leader-race.heart-beat-interval", "30")); } + public double getEpochRenewTimeoutRate() { + return Double.parseDouble(getOptional("kylin.server.leader-race.heart-beat-timeout-rate", "0.8")); + } + public long getDiscoveryClientTimeoutThreshold() { return TimeUtil.timeStringAs(getOptional("kylin.server.discovery-client-timeout-threshold", "3s"), TimeUnit.SECONDS); diff --git a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/EpochStore.java b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/EpochStore.java index dde480d2bc..388485b789 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/EpochStore.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/EpochStore.java @@ -20,11 +20,12 @@ package org.apache.kylin.common.persistence.metadata; import java.util.List; import java.util.Objects; -import lombok.extern.slf4j.Slf4j; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.Singletons; import org.apache.kylin.common.util.AddressUtil; +import lombok.extern.slf4j.Slf4j; + @Slf4j public abstract class EpochStore { public static final String EPOCH_SUFFIX = "_epoch"; @@ -54,6 +55,8 @@ public abstract class EpochStore { public abstract <T> T executeWithTransaction(Callback<T> callback); + public abstract <T> T executeWithTransaction(Callback<T> callback, int timeout); + public Epoch getGlobalEpoch() { return getEpoch("_global"); } diff --git a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/FileEpochStore.java b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/FileEpochStore.java index b18f4c5be4..b90a53b35e 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/FileEpochStore.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/FileEpochStore.java @@ -157,4 +157,9 @@ public class FileEpochStore extends EpochStore { return null; } + + @Override + public <T> T executeWithTransaction(Callback<T> callback, int timeout) { + return executeWithTransaction(callback); + } } diff --git a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcAuditLogStore.java b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcAuditLogStore.java index ec2081bd14..80af263876 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcAuditLogStore.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcAuditLogStore.java @@ -200,7 +200,7 @@ public class JdbcAuditLogStore implements AuditLogStore { } return null; }).filter(Objects::nonNull).collect(Collectors.toList())), - TransactionDefinition.ISOLATION_REPEATABLE_READ, beforeCommit); + TransactionDefinition.ISOLATION_REPEATABLE_READ, beforeCommit, TransactionDefinition.TIMEOUT_DEFAULT); } public void batchInsert(List<AuditLog> auditLogs) { diff --git a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcEpochStore.java b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcEpochStore.java index be32feb19e..bb444822ac 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcEpochStore.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcEpochStore.java @@ -21,6 +21,7 @@ import static org.apache.kylin.common.exception.CommonErrorCode.FAILED_UPDATE_ME import static org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil.datasourceParameters; import static org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil.isTableExists; import static org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil.withTransaction; +import static org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil.withTransactionTimeout; import java.io.InputStream; import java.sql.PreparedStatement; @@ -266,4 +267,9 @@ public class JdbcEpochStore extends EpochStore { public <T> T executeWithTransaction(Callback<T> callback) { return withTransaction(transactionManager, callback::handle); } + + @Override + public <T> T executeWithTransaction(Callback<T> callback, int timeout) { + return withTransactionTimeout(transactionManager, callback::handle, timeout); + } } diff --git a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/jdbc/JdbcUtil.java b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/jdbc/JdbcUtil.java index 7d74e8851d..94734b415e 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/jdbc/JdbcUtil.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/jdbc/JdbcUtil.java @@ -53,19 +53,27 @@ public class JdbcUtil { private static final Logger logger = LoggerFactory.getLogger(JdbcUtil.class); + public static <T> T withTransactionTimeout(DataSourceTransactionManager transactionManager, Callback<T> consumer, + int timeout) { + return withTransaction(transactionManager, consumer, TransactionDefinition.ISOLATION_REPEATABLE_READ, null, + timeout); + } + public static <T> T withTransaction(DataSourceTransactionManager transactionManager, Callback<T> consumer) { return withTransaction(transactionManager, consumer, TransactionDefinition.ISOLATION_REPEATABLE_READ); } public static <T> T withTransaction(DataSourceTransactionManager transactionManager, Callback<T> consumer, int isolationLevel) { - return withTransaction(transactionManager, consumer, isolationLevel, null); + return withTransaction(transactionManager, consumer, isolationLevel, null, + TransactionDefinition.TIMEOUT_DEFAULT); } public static <T> T withTransaction(DataSourceTransactionManager transactionManager, Callback<T> consumer, - int isolationLevel, Callback<T> beforeCommit) { + int isolationLevel, Callback<T> beforeCommit, int timeout) { val definition = new DefaultTransactionDefinition(); definition.setIsolationLevel(isolationLevel); + definition.setTimeout(timeout); val status = transactionManager.getTransaction(definition); try { T result = consumer.handle(); diff --git a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java index f927bd4e80..903fb8f6a0 100644 --- a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java +++ b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java @@ -1359,6 +1359,18 @@ class KylinConfigBaseTest { // Reset to prevent impacting other tests config.setProperty(WRITING_CLUSTER_WORKING_DIR, ""); } + + @Test + void testGetEpochRenewTimeoutRate() { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + Assertions.assertEquals(0.8, config.getEpochRenewTimeoutRate()); + config.setProperty("kylin.server.leader-race.heart-beat-timeout-rate", "0.0"); + Assertions.assertEquals(0.0, config.getEpochRenewTimeoutRate()); + config.setProperty("kylin.server.leader-race.heart-beat-timeout-rate", "0"); + Assertions.assertEquals(0.0, config.getEpochRenewTimeoutRate()); + config.setProperty("kylin.server.leader-race.heart-beat-timeout-rate", "1"); + Assertions.assertEquals(1.0, config.getEpochRenewTimeoutRate()); + } } class EnvironmentUpdateUtils { diff --git a/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/epochstore/AbstractEpochStoreTest.java b/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/epochstore/AbstractEpochStoreTest.java index 9cc3f9e84e..02f3b05363 100644 --- a/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/epochstore/AbstractEpochStoreTest.java +++ b/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/epochstore/AbstractEpochStoreTest.java @@ -27,10 +27,11 @@ import org.apache.kylin.common.persistence.metadata.EpochStore; import org.apache.kylin.common.persistence.metadata.JdbcEpochStore; import org.apache.kylin.common.util.AddressUtil; import org.apache.kylin.junit.annotation.MetadataInfo; -import org.junit.Assert; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import com.google.common.collect.Lists; + import lombok.val; @MetadataInfo(onlyProps = true) @@ -56,152 +57,107 @@ public abstract class AbstractEpochStoreTest { @Test public void testInsertAndUpdate() { - Epoch newSaveEpoch = new Epoch(); - newSaveEpoch.setEpochTarget("test1"); - newSaveEpoch.setCurrentEpochOwner("owner1"); - newSaveEpoch.setEpochId(1); - newSaveEpoch.setLastEpochRenewTime(System.currentTimeMillis()); - + Epoch mockEpoch = getMockEpoch("test1", "owner1"); //insert one - epochStore.insert(newSaveEpoch); - val epochs = epochStore.list(); - Assert.assertEquals(epochs.size(), 1); + epochStore.insert(mockEpoch); - Assert.assertTrue(compareEpoch(newSaveEpoch, epochs.get(0))); + val epochs = epochStore.list(); + Assertions.assertEquals(1, epochs.size()); + Assertions.assertTrue(compareEpoch(mockEpoch, epochs.get(0))); //update owner - newSaveEpoch.setCurrentEpochOwner("o2"); - epochStore.update(newSaveEpoch); - - Assert.assertEquals(newSaveEpoch.getCurrentEpochOwner(), epochStore.list().get(0).getCurrentEpochOwner()); + mockEpoch.setCurrentEpochOwner("o2"); + epochStore.update(mockEpoch); + Assertions.assertEquals(mockEpoch.getCurrentEpochOwner(), epochStore.list().get(0).getCurrentEpochOwner()); } @Test public void testExecuteWithTransaction_Success() { - Epoch e1 = new Epoch(); - e1.setEpochTarget("test1"); - e1.setCurrentEpochOwner("owner1"); - e1.setEpochId(1); - e1.setLastEpochRenewTime(System.currentTimeMillis()); - + Epoch mockEpoch = getMockEpoch("test1", "owner1"); epochStore.executeWithTransaction(() -> { - epochStore.insert(e1); - + epochStore.insert(mockEpoch); //insert success - Assert.assertEquals(epochStore.list().size(), 1); - Assert.assertTrue(compareEpoch(e1, epochStore.list().get(0))); - + Assertions.assertEquals(1, epochStore.list().size()); + Assertions.assertTrue(compareEpoch(mockEpoch, epochStore.list().get(0))); return null; }); - } @Test public void testBatchUpdate() { - Epoch e1 = new Epoch(); - e1.setEpochTarget("test1"); - e1.setCurrentEpochOwner("owner1"); - e1.setEpochId(1); - e1.setLastEpochRenewTime(System.currentTimeMillis()); - epochStore.insert(e1); + Epoch e1 = getMockEpoch("test1", "owner1"); + Epoch e2 = getMockEpoch("test2", "owner2"); - Epoch e2 = new Epoch(); - e2.setEpochTarget("test2"); - e2.setCurrentEpochOwner("owner2"); - e2.setEpochId(1); - e2.setLastEpochRenewTime(System.currentTimeMillis()); + epochStore.insert(e1); epochStore.insert(e2); - val batchEpochs = Arrays.asList(e1, e2); - + val batchEpochs = Lists.newArrayList(e1, e2); epochStore.updateBatch(batchEpochs); - - batchEpochs.forEach(epoch -> { - Assert.assertTrue(compareEpoch(epoch, epochStore.getEpoch(epoch.getEpochTarget()))); - }); - + batchEpochs.forEach( + epoch -> Assertions.assertTrue(compareEpoch(epoch, epochStore.getEpoch(epoch.getEpochTarget())))); } @Test public void testBatchUpdateWithError() { - Epoch e1 = new Epoch(); - e1.setEpochTarget("test1"); - e1.setCurrentEpochOwner("owner1"); - e1.setEpochId(1); - e1.setLastEpochRenewTime(System.currentTimeMillis()); - epochStore.insert(e1); + Epoch e1 = getMockEpoch("test1", "owner1"); + Epoch e2 = getMockEpoch("test2", "owner2"); - Epoch e2 = new Epoch(); - e2.setEpochTarget("test2"); - e2.setCurrentEpochOwner("owner2"); - e2.setEpochId(1); - e2.setLastEpochRenewTime(System.currentTimeMillis()); + epochStore.insert(e1); - val batchEpochs = Arrays.asList(e1, e2); boolean isError = false; try { - epochStore.updateBatch(batchEpochs); + epochStore.updateBatch(Lists.newArrayList(e1, e2)); } catch (Exception e) { isError = true; } if (epochStore instanceof JdbcEpochStore) { - Assert.assertTrue(isError); + Assertions.assertTrue(isError); } } @Test public void testBatchInsert() { - Epoch e1 = new Epoch(); - e1.setEpochTarget("test1"); - e1.setCurrentEpochOwner("owner1"); - e1.setEpochId(1); - e1.setLastEpochRenewTime(System.currentTimeMillis()); - - Epoch e2 = new Epoch(); - e2.setEpochTarget("test2"); - e2.setCurrentEpochOwner("owner2"); - e2.setEpochId(1); - e2.setLastEpochRenewTime(System.currentTimeMillis()); + Epoch e1 = getMockEpoch("test1", "owner1"); + Epoch e2 = getMockEpoch("test2", "owner2"); val batchEpochs = Arrays.asList(e1, e2); - epochStore.insertBatch(batchEpochs); - - batchEpochs.forEach(epoch -> { - Assert.assertTrue(compareEpoch(epoch, epochStore.getEpoch(epoch.getEpochTarget()))); - }); - + batchEpochs.forEach( + epoch -> Assertions.assertTrue(compareEpoch(epoch, epochStore.getEpoch(epoch.getEpochTarget())))); } @Test public void testIsLeaderNodeWithCurrentEpochOwnerNull() { - Epoch epoch = new Epoch(); - epoch.setEpochTarget("_global"); - epoch.setCurrentEpochOwner(null); - epochStore.insert(epoch); + Epoch mockEpoch = getMockEpoch("_global", null); + epochStore.insert(mockEpoch); Assertions.assertFalse(EpochStore.isLeaderNode()); } @Test public void testIsLeaderNodeWithServiceInfoNotEqual() { - Epoch epoch = new Epoch(); - epoch.setEpochTarget("_global"); - epoch.setCurrentEpochOwner("owner1"); - epochStore.insert(epoch); + Epoch mockEpoch = getMockEpoch("_global", "owner1"); + epochStore.insert(mockEpoch); Assertions.assertFalse(EpochStore.isLeaderNode()); } @Test public void testIsLeaderNodeWithServiceInfoEqual() { Assertions.assertFalse(EpochStore.isLeaderNode()); - Epoch epoch = new Epoch(); - epoch.setEpochTarget("_global"); - epoch.setCurrentEpochOwner(AddressUtil.getLocalInstance() + "|" + Long.MAX_VALUE); - epochStore.insert(epoch); + Epoch mockEpoch = getMockEpoch("_global", AddressUtil.getLocalInstance() + "|" + Long.MAX_VALUE); + epochStore.insert(mockEpoch); Assertions.assertTrue(EpochStore.isLeaderNode()); } + + protected Epoch getMockEpoch(String epochTarget, String epochOwner) { + Epoch epoch = new Epoch(); + epoch.setEpochTarget(epochTarget); + epoch.setCurrentEpochOwner(epochOwner); + epoch.setEpochId(1); + epoch.setLastEpochRenewTime(System.currentTimeMillis()); + return epoch; + } } diff --git a/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/epochstore/JdbcEpochStoreTest.java b/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/epochstore/JdbcEpochStoreTest.java index 06e1074264..d51c15ec66 100644 --- a/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/epochstore/JdbcEpochStoreTest.java +++ b/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/epochstore/JdbcEpochStoreTest.java @@ -19,17 +19,22 @@ package org.apache.kylin.common.persistence.metadata.epochstore; import static org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil.datasourceParameters; import static org.apache.kylin.common.util.TestUtils.getTestConfig; +import static org.awaitility.Awaitility.await; + +import java.util.concurrent.TimeUnit; import org.apache.commons.dbcp2.BasicDataSourceFactory; import org.apache.kylin.common.persistence.metadata.Epoch; import org.apache.kylin.junit.annotation.OverwriteProp; -import org.junit.Assert; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.TransactionException; import com.google.common.base.Throwables; +import com.google.common.collect.Lists; import lombok.val; @@ -55,34 +60,62 @@ public final class JdbcEpochStoreTest extends AbstractEpochStoreTest { } @Test - public void testExecuteWithTransaction_RollBack() { - - Epoch e1 = new Epoch(); - e1.setEpochTarget("test1"); - e1.setCurrentEpochOwner("owner1"); - e1.setEpochId(1); - e1.setLastEpochRenewTime(System.currentTimeMillis()); + void testExecuteWithTransaction_RollBack() { + Epoch mockEpoch = getMockEpoch("test1", "owner1"); try { epochStore.executeWithTransaction(() -> { - epochStore.insert(e1); - //insert success - Assert.assertEquals(epochStore.list().size(), 1); - Assert.assertTrue(compareEpoch(e1, epochStore.list().get(0))); + epochStore.insert(mockEpoch); + Assertions.assertEquals(1, epochStore.list().size()); + Assertions.assertTrue(compareEpoch(mockEpoch, epochStore.list().get(0))); if (epochStore.list().size() == 1) { throw new RuntimeException("mock transaction error"); } - return null; }); + Assertions.fail(); + } catch (RuntimeException e) { + Assertions.assertEquals("mock transaction error", Throwables.getRootCause(e).getMessage()); + Assertions.assertEquals(0, epochStore.list().size()); + } + } - Assert.fail(); + @Test + void testExecuteWithTransactionTimeout_RollBack() { + + Epoch mockEpoch = getMockEpoch("test1", "owner1"); + // before transaction + Assertions.assertEquals(0, epochStore.list().size()); + try { + epochStore.executeWithTransaction(() -> { + // mock transaction timeout + await().pollDelay(1100, TimeUnit.MILLISECONDS).until(() -> true); + epochStore.insertBatch(Lists.newArrayList(mockEpoch)); + return null; + }, 1); + Assertions.fail(); } catch (RuntimeException e) { - Assert.assertEquals(Throwables.getRootCause(e).getMessage(), "mock transaction error"); - Assert.assertEquals(epochStore.list().size(), 0); + Throwable rootCause = Throwables.getRootCause(e); + Assertions.assertTrue(rootCause instanceof TransactionException); + Assertions.assertTrue(rootCause.getMessage().contains("Transaction timed out")); } + // rollback result + Assertions.assertEquals(0, epochStore.list().size()); + } + @Test + void testExecuteWithTransactionTimeoutSuccess() { + + Epoch mockEpoch = getMockEpoch("test1", "owner1"); + Assertions.assertEquals(0, epochStore.list().size()); + epochStore.executeWithTransaction(() -> { + await().pollDelay(1100, TimeUnit.MILLISECONDS).until(() -> true); + epochStore.insertBatch(Lists.newArrayList(mockEpoch)); + Assertions.assertEquals(1, epochStore.list().size()); + return null; + }); + Assertions.assertEquals(1, epochStore.list().size()); } } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java index 1dff56cbcf..bb7f2c3d2f 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java @@ -19,7 +19,6 @@ package org.apache.kylin.metadata.epoch; import static org.apache.kylin.common.util.AddressUtil.MAINTAIN_MODE_MOCK_PORT; -import static org.apache.kylin.metadata.epoch.EpochUpdateLockManager.executeEpochWithLock; import java.util.ArrayList; import java.util.Collection; @@ -46,9 +45,6 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.Singletons; -import org.apache.kylin.common.util.NamedThreadFactory; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.common.persistence.metadata.Epoch; import org.apache.kylin.common.persistence.metadata.EpochStore; import org.apache.kylin.common.persistence.transaction.UnitOfWork; @@ -58,7 +54,10 @@ import org.apache.kylin.common.scheduler.ProjectControlledNotifier; import org.apache.kylin.common.scheduler.ProjectEscapedNotifier; import org.apache.kylin.common.scheduler.SourceUsageVerifyNotifier; import org.apache.kylin.common.util.AddressUtil; +import org.apache.kylin.common.util.NamedThreadFactory; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.metadata.project.NProjectManager; +import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.resourcegroup.ResourceGroupManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,6 +96,7 @@ public class EpochManager { private final String serverMode; private final boolean epochCheckEnabled; private final long epochExpiredTime; + private final int epochRenewTimeout; @Getter private final EpochUpdateManager epochUpdateManager; @@ -113,8 +113,13 @@ public class EpochManager { this.serverMode = config.getServerMode(); this.epochCheckEnabled = config.getEpochCheckerEnabled(); this.epochExpiredTime = config.getEpochExpireTimeSecond(); + this.epochRenewTimeout = getEpochRenewTimeout(); this.epochUpdateManager = new EpochUpdateManager(); + } + private int getEpochRenewTimeout() { + double timeoutRate = config.getEpochRenewTimeoutRate() <= 0 ? 1 : config.getEpochRenewTimeoutRate(); + return (int) (epochExpiredTime * timeoutRate); } public class EpochUpdateManager { @@ -260,7 +265,7 @@ public class EpochManager { }); try { - if (!countDownLatch.await(epochExpiredTime, TimeUnit.SECONDS)) { + if (!countDownLatch.await(epochRenewTimeout, TimeUnit.SECONDS)) { logger.error("renew not finished,{}/{}...", newRenewEpochSets.size(), oriEpochs.size()); } } catch (InterruptedException e) { @@ -449,7 +454,7 @@ public class EpochManager { epochStore.updateBatch(needUpdateEpochList); } return null; - }); + }, epochRenewTimeout); } /** diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java index 8e8d0ac2f3..164d952020 100644 --- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java +++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java @@ -227,9 +227,7 @@ class EpochManagerTest { void testUpdateProjectEpochWithResourceGroupEnabled() { val manager = ResourceGroupManager.getInstance(getTestConfig()); manager.getResourceGroup(); - manager.updateResourceGroup(copyForWrite -> { - copyForWrite.setResourceGroupEnabled(true); - }); + manager.updateResourceGroup(copyForWrite -> copyForWrite.setResourceGroupEnabled(true)); EpochManager epochManager = EpochManager.getInstance(); val prjMgr = NProjectManager.getInstance(getTestConfig()); for (ProjectInstance prj : prjMgr.listAllProjects()) { @@ -244,9 +242,7 @@ class EpochManagerTest { @Test void testGetEpochOwnerWithException() { EpochManager epochManager = EpochManager.getInstance(); - Assertions.assertThrows(IllegalStateException.class, () -> { - epochManager.getEpochOwner(null); - }); + Assertions.assertThrows(IllegalStateException.class, () -> epochManager.getEpochOwner(null)); } @Test @@ -392,6 +388,55 @@ class EpochManagerTest { } + @Test + void testEpochRenewTimeoutDefault() { + KylinConfig config = getTestConfig(); + double epochRenewTimeoutRate = config.getEpochRenewTimeoutRate(); + Assertions.assertEquals(0.8, epochRenewTimeoutRate); + EpochManager manager = EpochManager.getInstance(); + Object epochExpiredTime = ReflectionTestUtils.getField(manager, "epochExpiredTime"); + Assertions.assertNotNull(epochExpiredTime); + Assertions.assertEquals(60, (long) epochExpiredTime); + + Object epochRenewTimeout = ReflectionTestUtils.getField(manager, "epochRenewTimeout"); + Assertions.assertNotNull(epochRenewTimeout); + Assertions.assertEquals(60 * epochRenewTimeoutRate, (int) epochRenewTimeout); + } + + @Test + @OverwriteProp(key = "kylin.server.leader-race.heart-beat-timeout-rate", value = "0.0") + void testEpochRenewTimeoutOverride1() { + KylinConfig config = getTestConfig(); + double epochRenewTimeoutRate = config.getEpochRenewTimeoutRate(); + Assertions.assertEquals(0.0, epochRenewTimeoutRate); + + EpochManager manager = EpochManager.getInstance(); + Object epochExpiredTime = ReflectionTestUtils.getField(manager, "epochExpiredTime"); + Assertions.assertNotNull(epochExpiredTime); + Assertions.assertEquals(60, (long) epochExpiredTime); + + Object epochRenewTimeout = ReflectionTestUtils.getField(manager, "epochRenewTimeout"); + Assertions.assertNotNull(epochRenewTimeout); + Assertions.assertEquals(60, (int) epochRenewTimeout); + } + + @Test + @OverwriteProp(key = "kylin.server.leader-race.heart-beat-timeout-rate", value = "1.5") + void testEpochRenewTimeoutOverride2() { + KylinConfig config = getTestConfig(); + double epochRenewTimeoutRate = config.getEpochRenewTimeoutRate(); + Assertions.assertEquals(1.5, epochRenewTimeoutRate); + + EpochManager manager = EpochManager.getInstance(); + Object epochExpiredTime = ReflectionTestUtils.getField(manager, "epochExpiredTime"); + Assertions.assertNotNull(epochExpiredTime); + Assertions.assertEquals(60, (long) epochExpiredTime); + + Object epochRenewTimeout = ReflectionTestUtils.getField(manager, "epochRenewTimeout"); + Assertions.assertNotNull(epochRenewTimeout); + Assertions.assertEquals(60 * epochRenewTimeoutRate, (int) epochRenewTimeout); + } + EpochStore getEpochStore() { try { return EpochStore.getEpochStore(getTestConfig());