This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch 3.0.x in repository https://gitbox.apache.org/repos/asf/kylin.git
commit bb03e2741276d4dd1181dadf40b205aefe2fc82c Author: nichunen <[email protected]> AuthorDate: Wed Jan 15 14:51:34 2020 +0800 Fix synchronization on boxed types or strings --- .../common/persistence/JDBCResourceStore.java | 47 +++++++++++++--------- .../apache/kylin/dict/lookup/SnapshotManager.java | 44 ++++++++++++-------- 2 files changed, 54 insertions(+), 37 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java index 61bbb98..fd98383 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java @@ -32,6 +32,7 @@ import java.sql.SQLException; import java.sql.Types; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.fs.FileSystem; import org.apache.kylin.common.KylinConfig; @@ -44,28 +45,17 @@ import com.google.common.base.Preconditions; public class JDBCResourceStore extends PushdownResourceStore { - private static Logger logger = LoggerFactory.getLogger(JDBCResourceStore.class); - public static final String JDBC_SCHEME = "jdbc"; - + private static final ConcurrentHashMap<String, Object> lockObjectMap = new ConcurrentHashMap<>(); private static final String META_TABLE_KEY = "META_TABLE_KEY"; - private static final String META_TABLE_TS = "META_TABLE_TS"; - private static final String META_TABLE_CONTENT = "META_TABLE_CONTENT"; - - public static void checkScheme(StorageURL url) { - Preconditions.checkState(JDBC_SCHEME.equals(url.getScheme())); - } - - // ============================================================================ - + private static Logger logger = LoggerFactory.getLogger(JDBCResourceStore.class); private JDBCConnectionManager connectionManager; + // ============================================================================ private String[] tableNames = new String[2]; - private String metadataIdentifier = null; - // For test private long queriedSqlNum = 0; @@ -82,11 +72,21 @@ public class JDBCResourceStore extends PushdownResourceStore { } } - abstract static class SqlOperation { - PreparedStatement pstat = null; - ResultSet rs = null; + public static void checkScheme(StorageURL url) { + Preconditions.checkState(JDBC_SCHEME.equals(url.getScheme())); + } - abstract public void execute(final Connection connection) throws SQLException, IOException; + private Object getConcurrentObject(String resPath) { + if (!lockObjectMap.containsKey(resPath)) { + addObject(resPath); + } + return lockObjectMap.get(resPath); + } + + private synchronized void addObject(String resPath) { + if (!lockObjectMap.containsKey(resPath)) { + lockObjectMap.put(resPath, new Object()); + } } private void executeSql(SqlOperation operation) throws SQLException, IOException { @@ -349,7 +349,7 @@ public class JDBCResourceStore extends PushdownResourceStore { @Override public void execute(Connection connection) throws SQLException, IOException { byte[] bytes = content.extractAllBytes(); - synchronized (resPath.intern()) { + synchronized (getConcurrentObject(resPath)) { JDBCResourceSQL sqls = getJDBCResourceSQL(getMetaTableName(resPath)); boolean existing = existsImpl(resPath); if (existing) { @@ -439,7 +439,7 @@ public class JDBCResourceStore extends PushdownResourceStore { executeSql(new SqlOperation() { @Override public void execute(Connection connection) throws SQLException, IOException { - synchronized (resPath.intern()) { + synchronized (getConcurrentObject(resPath)) { JDBCResourceSQL sqls = getJDBCResourceSQL(getMetaTableName(resPath)); if (!existsImpl(resPath)) { if (oldTS != 0) { @@ -640,4 +640,11 @@ public class JDBCResourceStore extends PushdownResourceStore { return "/".equals(path); } + abstract static class SqlOperation { + PreparedStatement pstat = null; + ResultSet rs = null; + + abstract public void execute(final Connection connection) throws SQLException, IOException; + } + } \ No newline at end of file diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java index 8f68fb0..76a3df9 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java @@ -21,6 +21,7 @@ package org.apache.kylin.dict.lookup; import java.io.IOException; import java.util.List; import java.util.NavigableSet; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -38,8 +39,6 @@ import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; -import com.google.common.collect.Interner; -import com.google.common.collect.Interners; import com.google.common.collect.Lists; /** @@ -48,23 +47,13 @@ import com.google.common.collect.Lists; public class SnapshotManager { private static final Logger logger = LoggerFactory.getLogger(SnapshotManager.class); - - public static SnapshotManager getInstance(KylinConfig config) { - return config.getManager(SnapshotManager.class); - } - - // called by reflection - static SnapshotManager newInstance(KylinConfig config) throws IOException { - return new SnapshotManager(config); - } - - // ============================================================================ - + private static final ConcurrentHashMap<String, Object> lockObjectMap = new ConcurrentHashMap<>(); private KylinConfig config; - // path ==> SnapshotTable private LoadingCache<String, SnapshotTable> snapshotCache; // resource + // ============================================================================ + private SnapshotManager(KylinConfig config) { this.config = config; this.snapshotCache = CacheBuilder.newBuilder().removalListener(new RemovalListener<String, SnapshotTable>() { @@ -83,6 +72,28 @@ public class SnapshotManager { }); } + public static SnapshotManager getInstance(KylinConfig config) { + return config.getManager(SnapshotManager.class); + } + + // called by reflection + static SnapshotManager newInstance(KylinConfig config) throws IOException { + return new SnapshotManager(config); + } + + private Object getConcurrentObject(String resPath) { + if (!lockObjectMap.containsKey(resPath)) { + addObject(resPath); + } + return lockObjectMap.get(resPath); + } + + private synchronized void addObject(String resPath) { + if (!lockObjectMap.containsKey(resPath)) { + lockObjectMap.put(resPath, new Object()); + } + } + public void wipeoutCache() { snapshotCache.invalidateAll(); } @@ -127,9 +138,8 @@ public class SnapshotManager { throws IOException { SnapshotTable snapshot = new SnapshotTable(table, tableDesc.getIdentity()); snapshot.updateRandomUuid(); - Interner<String> pool = Interners.newWeakInterner(); - synchronized (pool.intern(tableDesc.getIdentity())) { + synchronized (getConcurrentObject(tableDesc.getIdentity())) { SnapshotTable reusableSnapshot = getReusableSnapShot(table, snapshot, tableDesc, cubeConfig); if (reusableSnapshot != null) return updateDictLastModifiedTime(reusableSnapshot.getResourcePath());
