This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch 2.6.x in repository https://gitbox.apache.org/repos/asf/kylin.git
commit ec6248486f53189278a88c831081c488b4ea724c Author: yaqian.zhang <[email protected]> AuthorDate: Mon Jan 13 17:36:35 2020 +0800 KYLIN-4291 Parallel segment building may causes WriteConflictException --- .../org/apache/kylin/dict/DictionaryManager.java | 31 +++++++++++++++++----- .../apache/kylin/dict/lookup/SnapshotManager.java | 20 +++++++++++--- 2 files changed, 42 insertions(+), 9 deletions(-) diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java index ffee105..6b23a9e 100755 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.persistence.WriteConflictException; import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.JsonUtil; @@ -193,12 +194,24 @@ public class DictionaryManager { ResourceStore store = getStore(); if (StringUtils.isBlank(dictPath)) return NONE_INDICATOR; - long now = System.currentTimeMillis(); - store.updateTimestamp(dictPath, now); - logger.info("Update dictionary {} lastModifiedTime to {}", dictPath, now); - DictionaryInfo dictInfo = load(dictPath, true); - updateDictCache(dictInfo); - return dictInfo; + + int retry = 7; + while (retry-- > 0) { + try { + long now = System.currentTimeMillis(); + store.updateTimestamp(dictPath, now); + logger.info("Update dictionary {} lastModifiedTime to {}", dictPath, now); + return loadAndUpdateLocalCache(dictPath); + } catch (WriteConflictException e) { + if (retry <= 0) { + logger.error("Retry is out, till got error, abandoning...", e); + throw e; + } + logger.warn("Write conflict to update dictionary " + dictPath + " retry remaining " + retry + + ", will retry..."); + } + } + return loadAndUpdateLocalCache(dictPath); } private void initDictInfo(Dictionary<String> newDict, DictionaryInfo newDictInfo) { @@ -411,6 +424,12 @@ public class DictionaryManager { store.putBigResource(path, dict, System.currentTimeMillis(), DictionaryInfoSerializer.FULL_SERIALIZER); } + private DictionaryInfo loadAndUpdateLocalCache(String dictPath) throws IOException { + DictionaryInfo dictInfo = load(dictPath, true); + updateDictCache(dictInfo); + return dictInfo; + } + DictionaryInfo load(String resourcePath, boolean loadDictObj) throws IOException { ResourceStore store = getStore(); diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java index 76a3df9..9d591b5 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.persistence.WriteConflictException; import org.apache.kylin.metadata.TableMetadataManager; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.source.IReadableTable; @@ -236,10 +237,23 @@ public class SnapshotManager { private SnapshotTable updateDictLastModifiedTime(String snapshotPath) throws IOException { ResourceStore store = getStore(); - long now = System.currentTimeMillis(); - store.updateTimestamp(snapshotPath, now); - logger.info("Update snapshotTable {} lastModifiedTime to {}", snapshotPath, now); + int retry = 7; + while (retry-- > 0) { + try { + long now = System.currentTimeMillis(); + store.updateTimestamp(snapshotPath, now); + logger.info("Update snapshotTable {} lastModifiedTime to {}", snapshotPath, now); + return loadAndUpdateLocalCache(snapshotPath); + } catch (WriteConflictException e) { + if (retry <= 0) { + logger.error("Retry is out, till got error, abandoning...", e); + throw e; + } + logger.warn("Write conflict to update snapshotTable " + snapshotPath + " retry remaining " + retry + + ", will retry..."); + } + } // update cache return loadAndUpdateLocalCache(snapshotPath); }
