This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 042521354589c0746ff509bead019710471035bf Author: Hongshuai Zhang <[email protected]> AuthorDate: Thu Jul 20 20:18:14 2023 +0800 [fix](multi-catalog) check properties when alter catalog (#20130) When we were altering the catalog, we did not verify the new parameters of the catalog, and now we have added verification My changes: When We are altering the catalog, I have carried out a full inspection, and if an exception occurs, the parameters will be rolled back --- .../org/apache/doris/datasource/CatalogMgr.java | 26 +++++++- .../apache/doris/datasource/CatalogProperty.java | 6 ++ .../apache/doris/datasource/ExternalCatalog.java | 14 +++- .../java/org/apache/doris/persist/EditLog.java | 2 +- .../apache/doris/datasource/CatalogMgrTest.java | 76 +++++++++++++++++++++- 5 files changed, 117 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java index 3ed74e260b..a9a00fc61d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java @@ -336,6 +336,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable { writeLock(); try { CatalogIf catalog = nameToCatalog.get(stmt.getCatalogName()); + Map<String, String> oldProperties = catalog.getProperties(); if (catalog == null) { throw new DdlException("No catalog found with name: " + stmt.getCatalogName()); } @@ -344,7 +345,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable { throw new DdlException("Can't modify the type of catalog property with name: " + stmt.getCatalogName()); } CatalogLog log = CatalogFactory.createCatalogLog(catalog.getId(), stmt); - replayAlterCatalogProps(log); + replayAlterCatalogProps(log, oldProperties, false); Env.getCurrentEnv().getEditLog().logCatalogLog(OperationType.OP_ALTER_CATALOG_PROPS, log); } finally { writeUnlock(); @@ -561,10 +562,31 @@ public class CatalogMgr implements Writable, GsonPostProcessable { /** * Reply for alter catalog props event. */ - public void replayAlterCatalogProps(CatalogLog log) { + public void replayAlterCatalogProps(CatalogLog log, Map<String, String> oldProperties, boolean isReplay) + throws DdlException { writeLock(); try { CatalogIf catalog = idToCatalog.get(log.getCatalogId()); + if (catalog instanceof ExternalCatalog) { + Map<String, String> newProps = log.getNewProps(); + ((ExternalCatalog) catalog).tryModifyCatalogProps(newProps); + if (!isReplay) { + try { + ((ExternalCatalog) catalog).checkProperties(); + } catch (DdlException ddlException) { + if (oldProperties != null) { + ((ExternalCatalog) catalog).rollBackCatalogProps(oldProperties); + } + throw ddlException; + } + } + if (newProps.containsKey(METADATA_REFRESH_INTERVAL_SEC)) { + long catalogId = catalog.getId(); + Integer metadataRefreshIntervalSec = Integer.valueOf(newProps.get(METADATA_REFRESH_INTERVAL_SEC)); + Integer[] sec = {metadataRefreshIntervalSec, metadataRefreshIntervalSec}; + Env.getCurrentEnv().getRefreshManager().addToRefreshMap(catalogId, sec); + } + } catalog.modifyCatalogProps(log.getNewProps()); } finally { writeUnlock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogProperty.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogProperty.java index fd1403c72f..536ab74b28 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogProperty.java @@ -34,6 +34,7 @@ import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.HashMap; import java.util.Map; /** @@ -99,6 +100,11 @@ public class CatalogProperty implements Writable { properties.putAll(PropertyConverter.convertToMetaProperties(props)); } + public void rollBackCatalogProps(Map<String, String> props) { + properties.clear(); + properties = new HashMap<>(props); + } + public Map<String, String> getHadoopProperties() { Map<String, String> hadoopProperties = getProperties(); hadoopProperties.putAll(PropertyConverter.convertToHadoopFSProperties(getProperties())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index cf2de86494..c169ed2c96 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -179,7 +179,11 @@ public abstract class ExternalCatalog Map<String, String> properties = getCatalogProperty().getProperties(); if (properties.containsKey(CatalogMgr.METADATA_REFRESH_INTERVAL_SEC)) { try { - Integer.valueOf(properties.get(CatalogMgr.METADATA_REFRESH_INTERVAL_SEC)); + Integer metadataRefreshIntervalSec = Integer.valueOf( + properties.get(CatalogMgr.METADATA_REFRESH_INTERVAL_SEC)); + if (metadataRefreshIntervalSec < 0) { + throw new DdlException("Invalid properties: " + CatalogMgr.METADATA_REFRESH_INTERVAL_SEC); + } } catch (NumberFormatException e) { throw new DdlException("Invalid properties: " + CatalogMgr.METADATA_REFRESH_INTERVAL_SEC); } @@ -386,6 +390,14 @@ public abstract class ExternalCatalog notifyPropertiesUpdated(props); } + public void tryModifyCatalogProps(Map<String, String> props) { + catalogProperty.modifyCatalogProps(props); + } + + public void rollBackCatalogProps(Map<String, String> props) { + catalogProperty.rollBackCatalogProps(props); + } + private void modifyComment(Map<String, String> props) { setComment(props.getOrDefault("comment", comment)); props.remove("comment"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index f19d215e30..b087d53308 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -865,7 +865,7 @@ public class EditLog { } case OperationType.OP_ALTER_CATALOG_PROPS: { CatalogLog log = (CatalogLog) journal.getData(); - env.getCatalogMgr().replayAlterCatalogProps(log); + env.getCatalogMgr().replayAlterCatalogProps(log, null, true); break; } case OperationType.OP_REFRESH_CATALOG: { diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java index ea49f99c25..09fc1bb1a0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/CatalogMgrTest.java @@ -239,17 +239,21 @@ public class CatalogMgrTest extends TestWithFeService { Map<String, String> alterProps2 = Maps.newHashMap(); alterProps2.put("dfs.nameservices", "service1"); alterProps2.put("dfs.ha.namenodes.service1", "nn1,nn2"); + alterProps2.put("dfs.namenode.rpc-address.service1.nn1", "nn1_host:rpc_port"); + alterProps2.put("dfs.namenode.rpc-address.service1.nn2", "nn2_host:rpc_port"); + alterProps2.put("dfs.client.failover.proxy.provider.service1", + "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); AlterCatalogPropertyStmt alterStmt = new AlterCatalogPropertyStmt(MY_CATALOG, alterProps2); mgr.alterCatalogProps(alterStmt); catalog = env.getCatalogMgr().getCatalog(MY_CATALOG); - Assert.assertEquals(6, catalog.getProperties().size()); + Assert.assertEquals(9, catalog.getProperties().size()); Assert.assertEquals("service1", catalog.getProperties().get("dfs.nameservices")); String showDetailCatalog = "SHOW CATALOG my_catalog"; ShowCatalogStmt showDetailStmt = (ShowCatalogStmt) parseAndAnalyzeStmt(showDetailCatalog); showResultSet = mgr.showCatalogs(showDetailStmt); - Assert.assertEquals(6, showResultSet.getResultRows().size()); + Assert.assertEquals(9, showResultSet.getResultRows().size()); for (List<String> row : showResultSet.getResultRows()) { Assertions.assertEquals(2, row.size()); if (row.get(0).equalsIgnoreCase("type")) { @@ -318,7 +322,7 @@ public class CatalogMgrTest extends TestWithFeService { CatalogIf hms = mgr2.getCatalog(MY_CATALOG); properties = hms.getProperties(); - Assert.assertEquals(6, properties.size()); + Assert.assertEquals(9, properties.size()); Assert.assertEquals("hms", properties.get("type")); Assert.assertEquals("thrift://172.16.5.9:9083", properties.get("hive.metastore.uris")); @@ -649,6 +653,72 @@ public class CatalogMgrTest extends TestWithFeService { ExceptionChecker.expectThrowsNoException(() -> mgr.createCatalog(createStmt7)); } + @Test + public void testInvalidAndValidAlterCatalogProperties() throws Exception { + + String catalogName = "test_hive1"; + String createCatalogSql = "CREATE CATALOG test_hive1 PROPERTIES (\n" + + " 'type'='hms',\n" + + " 'hive.metastore.uris' = 'thrift://127.0.0.1:7007',\n" + + " 'dfs.nameservices' = 'HANN',\n" + + " 'dfs.ha.namenodes.HANN'='nn1,nn2',\n" + + " 'dfs.namenode.rpc-address.HANN.nn1'='127.0.0.1:4007',\n" + + " 'dfs.namenode.rpc-address.HANN.nn2'='127.0.0.1:4008',\n" + + " 'dfs.client.failover.proxy.provider.HANN'" + + "='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'\n" + + ");"; + CreateCatalogStmt createStmt = (CreateCatalogStmt) parseAndAnalyzeStmt(createCatalogSql); + mgr.createCatalog(createStmt); + + String alterCatalogSql = "ALTER CATALOG test_hive1 SET PROPERTIES (\n" + + " 'type'='hms',\n" + + " 'hive.metastore.uris' = 'thrift://127.0.0.1:7007',\n" + + " 'dfs.nameservices' = 'HANN',\n" + + " 'dfs.ha.namenodes.HANN'='',\n" + + " 'dfs.client.failover.proxy.provider.HANN'" + + "='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'\n" + + ");"; + AlterCatalogPropertyStmt alterCatalogPropertyStmt1 = (AlterCatalogPropertyStmt) parseAndAnalyzeStmt( + alterCatalogSql); + ExceptionChecker.expectThrowsWithMsg(DdlException.class, + "Missing dfs.ha.namenodes.HANN property", + () -> mgr.alterCatalogProps(alterCatalogPropertyStmt1)); + + alterCatalogSql = "ALTER CATALOG test_hive1 SET PROPERTIES (\n" + + " 'type'='hms',\n" + + " 'hive.metastore.uris' = 'thrift://127.0.0.1:7007',\n" + + " 'dfs.nameservices' = 'HANN',\n" + + " 'dfs.ha.namenodes.HANN'='nn1,nn3',\n" + + " 'dfs.namenode.rpc-address.HANN.nn1'='127.0.0.1:4007',\n" + + " 'dfs.namenode.rpc-address.HANN.nn3'='127.0.0.1:4007',\n" + + " 'dfs.client.failover.proxy.provider.HANN'" + + "=''\n" + + ");"; + AlterCatalogPropertyStmt alterCatalogPropertyStmt2 = (AlterCatalogPropertyStmt) parseAndAnalyzeStmt( + alterCatalogSql); + ExceptionChecker.expectThrowsWithMsg(DdlException.class, + "Missing dfs.client.failover.proxy.provider.HANN property", + () -> mgr.alterCatalogProps(alterCatalogPropertyStmt2)); + + alterCatalogSql = "ALTER CATALOG test_hive1 SET PROPERTIES (\n" + + " 'type'='hms',\n" + + " 'hive.metastore.uris' = 'thrift://127.0.0.1:7007',\n" + + " 'dfs.nameservices' = 'HANN',\n" + + " 'dfs.ha.namenodes.HANN'='nn1,nn3',\n" + + " 'dfs.namenode.rpc-address.HANN.nn1'='127.0.0.1:4007',\n" + + " 'dfs.namenode.rpc-address.HANN.nn3'='127.0.0.1:4007',\n" + + " 'dfs.client.failover.proxy.provider.HANN'" + + "='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'\n" + + ");"; + AlterCatalogPropertyStmt alterCatalogPropertyStmt3 = (AlterCatalogPropertyStmt) parseAndAnalyzeStmt( + alterCatalogSql); + mgr.alterCatalogProps(alterCatalogPropertyStmt3); + + CatalogIf catalog = env.getCatalogMgr().getCatalog(catalogName); + Assert.assertEquals(10, catalog.getProperties().size()); + Assert.assertEquals("nn1,nn3", catalog.getProperties().get("dfs.ha.namenodes.HANN")); + } + public void testAlterFileCache() throws Exception { String catalogName = "good_hive_3"; String createCatalogSql = "CREATE CATALOG " + catalogName --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
