This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 5e3ba24 KYLIN-4291 Parallel segment building may causes
WriteConflictException
5e3ba24 is described below
commit 5e3ba2486ba4dcf68223e154124066086fa52235
Author: yaqian.zhang <[email protected]>
AuthorDate: Mon Jan 13 17:36:35 2020 +0800
KYLIN-4291 Parallel segment building may causes WriteConflictException
---
.../org/apache/kylin/dict/DictionaryManager.java | 31 +++++++++++++++++-----
.../apache/kylin/dict/lookup/SnapshotManager.java | 20 +++++++++++---
2 files changed, 42 insertions(+), 9 deletions(-)
diff --git
a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index ffee105..6b23a9e 100755
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.WriteConflictException;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.JsonUtil;
@@ -193,12 +194,24 @@ public class DictionaryManager {
ResourceStore store = getStore();
if (StringUtils.isBlank(dictPath))
return NONE_INDICATOR;
- long now = System.currentTimeMillis();
- store.updateTimestamp(dictPath, now);
- logger.info("Update dictionary {} lastModifiedTime to {}", dictPath,
now);
- DictionaryInfo dictInfo = load(dictPath, true);
- updateDictCache(dictInfo);
- return dictInfo;
+
+ int retry = 7;
+ while (retry-- > 0) {
+ try {
+ long now = System.currentTimeMillis();
+ store.updateTimestamp(dictPath, now);
+ logger.info("Update dictionary {} lastModifiedTime to {}",
dictPath, now);
+ return loadAndUpdateLocalCache(dictPath);
+ } catch (WriteConflictException e) {
+ if (retry <= 0) {
+ logger.error("Retry is out, till got error,
abandoning...", e);
+ throw e;
+ }
+ logger.warn("Write conflict to update dictionary " + dictPath
+ " retry remaining " + retry
+ + ", will retry...");
+ }
+ }
+ return loadAndUpdateLocalCache(dictPath);
}
private void initDictInfo(Dictionary<String> newDict, DictionaryInfo
newDictInfo) {
@@ -411,6 +424,12 @@ public class DictionaryManager {
store.putBigResource(path, dict, System.currentTimeMillis(),
DictionaryInfoSerializer.FULL_SERIALIZER);
}
+ private DictionaryInfo loadAndUpdateLocalCache(String dictPath) throws
IOException {
+ DictionaryInfo dictInfo = load(dictPath, true);
+ updateDictCache(dictInfo);
+ return dictInfo;
+ }
+
DictionaryInfo load(String resourcePath, boolean loadDictObj) throws
IOException {
ResourceStore store = getStore();
diff --git
a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
index 76a3df9..9d591b5 100644
---
a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
+++
b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.WriteConflictException;
import org.apache.kylin.metadata.TableMetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.source.IReadableTable;
@@ -236,10 +237,23 @@ public class SnapshotManager {
private SnapshotTable updateDictLastModifiedTime(String snapshotPath)
throws IOException {
ResourceStore store = getStore();
- long now = System.currentTimeMillis();
- store.updateTimestamp(snapshotPath, now);
- logger.info("Update snapshotTable {} lastModifiedTime to {}",
snapshotPath, now);
+ int retry = 7;
+ while (retry-- > 0) {
+ try {
+ long now = System.currentTimeMillis();
+ store.updateTimestamp(snapshotPath, now);
+ logger.info("Update snapshotTable {} lastModifiedTime to {}",
snapshotPath, now);
+ return loadAndUpdateLocalCache(snapshotPath);
+ } catch (WriteConflictException e) {
+ if (retry <= 0) {
+ logger.error("Retry is out, till got error,
abandoning...", e);
+ throw e;
+ }
+ logger.warn("Write conflict to update snapshotTable " +
snapshotPath + " retry remaining " + retry
+ + ", will retry...");
+ }
+ }
// update cache
return loadAndUpdateLocalCache(snapshotPath);
}