This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 6789a56f059 [fix](create table) fix create table fail cause meta
leaky #37488, #33574 (#37516)
6789a56f059 is described below
commit 6789a56f059e51ed38b758aae53cc2df4d39b8fc
Author: yujun <[email protected]>
AuthorDate: Tue Jul 9 17:17:12 2024 +0800
[fix](create table) fix create table fail cause meta leaky #37488, #33574
(#37516)
cherry-pick: #37488, #33574
---------
Co-authored-by: deardeng <[email protected]>
---
.../apache/doris/datasource/InternalCatalog.java | 37 +++++-
.../test_create_table_exception.groovy | 143 +++++++++++++++++++++
2 files changed, 174 insertions(+), 6 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 8c027412fcf..29a8d20321b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -958,21 +958,26 @@ public class InternalCatalog implements
CatalogIf<Database> {
return true;
}
- public void replayDropTable(Database db, long tableId, boolean isForceDrop,
- Long recycleTime) throws MetaNotFoundException {
+ private void dropTable(Database db, long tableId, boolean isForceDrop,
boolean isReplay,
+ Long recycleTime) throws MetaNotFoundException {
Table table = db.getTableOrMetaException(tableId);
db.writeLock();
table.writeLock();
try {
- unprotectDropTable(db, table, isForceDrop, true, recycleTime);
-
Env.getCurrentEnv().getQueryStats().clear(Env.getCurrentInternalCatalog().getId(),
db.getId(),
- tableId);
+ unprotectDropTable(db, table, isForceDrop, isReplay, recycleTime);
+
Env.getCurrentEnv().getQueryStats().clear(Env.getCurrentInternalCatalog().getId(),
db.getId(), tableId);
+
//Env.getCurrentEnv().getAnalysisManager().removeTableStats(table.getId());
} finally {
table.writeUnlock();
db.writeUnlock();
}
}
+ public void replayDropTable(Database db, long tableId, boolean isForceDrop,
+ Long recycleTime) throws MetaNotFoundException {
+ dropTable(db, tableId, isForceDrop, true, recycleTime);
+ }
+
public void replayEraseTable(long tableId) {
Env.getCurrentRecycleBin().replayEraseTable(tableId);
}
@@ -2468,6 +2473,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
// if failed in any step, use this set to do clear things
Set<Long> tabletIdSet = new HashSet<>();
// create partition
+ boolean hadLogEditCreateTable = false;
try {
if (partitionInfo.getType() == PartitionType.UNPARTITIONED) {
if (storagePolicy.equals("") && properties != null &&
!properties.isEmpty()) {
@@ -2613,6 +2619,9 @@ public class InternalCatalog implements
CatalogIf<Database> {
LOG.info("duplicate create table[{};{}], skip next steps",
tableName, tableId);
tableHasExists = true;
} else {
+ // if table not exists, then db.createTableWithLock will write
an editlog.
+ hadLogEditCreateTable = true;
+
// we have added these index to memory, only need to persist
here
if (Env.getCurrentColocateIndex().isColocateTable(tableId)) {
GroupId groupId =
Env.getCurrentColocateIndex().getGroup(tableId);
@@ -2631,14 +2640,30 @@ public class InternalCatalog implements
CatalogIf<Database> {
.createOrUpdateRuntimeInfo(tableId,
DynamicPartitionScheduler.LAST_UPDATE_TIME,
TimeUtils.getCurrentFormatTime());
}
+
+ if (DebugPointUtil.isEnable("FE.createOlapTable.exception")) {
+ LOG.info("debug point FE.createOlapTable.exception, throw e");
+ throw new DdlException("debug point
FE.createOlapTable.exception");
+ }
} catch (DdlException e) {
+ LOG.warn("create table failed {} - {}", tabletIdSet,
e.getMessage());
for (Long tabletId : tabletIdSet) {
Env.getCurrentInvertedIndex().deleteTablet(tabletId);
}
- // only remove from memory, because we have not persist it
+ // edit log write DropTableInfo will result in deleting colocate
group,
+ // but follow fe may need wait 30s (recycle bin mgr run every 30s).
if (Env.getCurrentColocateIndex().isColocateTable(tableId)) {
Env.getCurrentColocateIndex().removeTable(tableId);
}
+ try {
+ dropTable(db, tableId, true, false, 0L);
+ if (hadLogEditCreateTable) {
+ DropInfo info = new DropInfo(db.getId(), tableId,
olapTable.getName(), -1L, true, 0L);
+ Env.getCurrentEnv().getEditLog().logDropTable(info);
+ }
+ } catch (Exception ex) {
+ LOG.warn("drop table", ex);
+ }
throw e;
}
diff --git
a/regression-test/suites/partition_p0/test_create_table_exception.groovy
b/regression-test/suites/partition_p0/test_create_table_exception.groovy
new file mode 100644
index 00000000000..c3069d8e592
--- /dev/null
+++ b/regression-test/suites/partition_p0/test_create_table_exception.groovy
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.NodeType
+import org.apache.doris.regression.suite.SuiteCluster
+
+suite("test_create_table_exception") {
+ def options = new ClusterOptions()
+ options.enableDebugPoints()
+ options.setFeNum(3)
+ options.feConfigs.add('max_dynamic_partition_num=2000')
+
+ docker(options) {
+ sleep 2000
+ def table1 = "normal_table"
+ def table2 = "range_table"
+ def table3 = "dynamic_partition_table"
+ try {
+
GetDebugPoint().enableDebugPointForAllFEs('FE.createOlapTable.exception', null)
+ def createTable = { tableIdx ->
+ try_sql """
+ CREATE TABLE ${table1}_${tableIdx} (
+ `k1` int(11) NULL,
+ `k2` int(11) NULL
+ )
+ DUPLICATE KEY(`k1`, `k2`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 10
+ PROPERTIES (
+ "colocate_with" = "col_grp_${tableIdx}",
+ "replication_num"="3"
+ );
+ """
+
+ try_sql """
+ CREATE TABLE IF NOT EXISTS ${table2}_${tableIdx} (
+ lo_orderdate int(11) NOT NULL COMMENT "",
+ lo_orderkey bigint(20) NOT NULL COMMENT "",
+ lo_linenumber bigint(20) NOT NULL COMMENT "",
+ lo_custkey int(11) NOT NULL COMMENT "",
+ lo_partkey int(11) NOT NULL COMMENT "",
+ lo_suppkey int(11) NOT NULL COMMENT "",
+ lo_orderpriority varchar(64) NOT NULL COMMENT "",
+ lo_shippriority int(11) NOT NULL COMMENT "",
+ lo_quantity bigint(20) NOT NULL COMMENT "",
+ lo_extendedprice bigint(20) NOT NULL COMMENT "",
+ lo_ordtotalprice bigint(20) NOT NULL COMMENT "",
+ lo_discount bigint(20) NOT NULL COMMENT "",
+ lo_revenue bigint(20) NOT NULL COMMENT "",
+ lo_supplycost bigint(20) NOT NULL COMMENT "",
+ lo_tax bigint(20) NOT NULL COMMENT "",
+ lo_commitdate bigint(20) NOT NULL COMMENT "",
+ lo_shipmode varchar(64) NOT NULL COMMENT "" )
+ ENGINE=OLAP
+ UNIQUE KEY(lo_orderdate, lo_orderkey, lo_linenumber)
+ COMMENT "OLAP"
+ PARTITION BY RANGE(lo_orderdate) (
+ PARTITION p1992 VALUES [("-2147483648"), ("19930101")),
+ PARTITION p1993 VALUES [("19930101"), ("19940101")),
+ PARTITION p1994 VALUES [("19940101"), ("19950101")),
+ PARTITION p1995 VALUES [("19950101"), ("19960101")),
+ PARTITION p1996 VALUES [("19960101"), ("19970101")),
+ PARTITION p1997 VALUES [("19970101"), ("19980101")),
+ PARTITION p1998 VALUES [("19980101"), ("19990101")))
+ DISTRIBUTED BY HASH(lo_orderkey) BUCKETS 48;
+ """
+
+ try_sql """
+ CREATE TABLE ${table3}_${tableIdx} (
+ time date,
+ key1 int,
+ key2 int,
+ value1 int,
+ value2 int
+ ) ENGINE = OLAP UNIQUE KEY(
+ `time`,
+ `key1`,
+ `key2`
+ ) COMMENT 'OLAP' PARTITION BY RANGE(`time`)()
+ DISTRIBUTED BY HASH(`key1`) BUCKETS 6 PROPERTIES (
+ "bloom_filter_columns" = "time",
+ "dynamic_partition.enable" = "true",
+ "dynamic_partition.time_unit" = "DAY",
+ "dynamic_partition.time_zone" = "Asia/Shanghai",
+ "dynamic_partition.start" = "-730",
+ "dynamic_partition.end" = "3",
+ "dynamic_partition.prefix" = "p",
+ "dynamic_partition.buckets" = "2",
+ "dynamic_partition.create_history_partition" = "true",
+ "dynamic_partition.history_partition_num" = "-1",
+ "dynamic_partition.hot_partition_num" = "0",
+ "dynamic_partition.reserved_history_periods" = "NULL",
+ "enable_unique_key_merge_on_write" = "true",
+ "light_schema_change" = "true"
+ );
+ """
+ }
+ createTable(1)
+ def result = sql """show tables;"""
+ assertEquals(result.size(), 0)
+
+ def checkResult = { ->
+ def tables = sql """show tables;"""
+ log.info("tables=" + tables)
+ assertEquals(3, tables.size())
+
+ def groups = sql """ show proc "/colocation_group" """
+ log.info("groups=" + groups)
+ assertEquals(1, groups.size())
+ }
+
+
GetDebugPoint().disableDebugPointForAllFEs('FE.createOlapTable.exception')
+ createTable(2)
+ checkResult()
+
+ sleep 1000
+ cluster.restartFrontends(cluster.getMasterFe().index)
+ sleep 32_000
+ def newMasterFe = cluster.getMasterFe()
+ def newMasterFeUrl =
"jdbc:mysql://${newMasterFe.host}:${newMasterFe.queryPort}/?useLocalSessionState=false&allowLoadLocalInfile=false"
+ newMasterFeUrl = context.config.buildUrlWithDb(newMasterFeUrl,
context.dbName)
+ connect('root', '', newMasterFeUrl) {
+ checkResult()
+ }
+
+ } finally {
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]