This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 92d8f6a [Alter] Allow submitting alter jobs when table is unstable
92d8f6a is described below
commit 92d8f6ae78a5c8d2bbf2bbdce68b7b75d237ef20
Author: WingC <[email protected]>
AuthorDate: Sat Jan 18 22:56:37 2020 +0800
[Alter] Allow submitting alter jobs when table is unstable
Alter job will wait table to be stable before running.
---
fe/src/main/java/org/apache/doris/alter/Alter.java | 5 +-
.../java/org/apache/doris/alter/RollupJobV2.java | 11 +-
.../apache/doris/alter/SchemaChangeHandler.java | 6 +-
.../org/apache/doris/alter/SchemaChangeJobV2.java | 14 +-
.../java/org/apache/doris/clone/TabletChecker.java | 153 ++++++++++-----------
.../org/apache/doris/alter/RollupJobV2Test.java | 88 +++++++++++-
.../apache/doris/alter/SchemaChangeJobV2Test.java | 82 +++++++++++
.../org/apache/doris/catalog/CatalogTestUtil.java | 7 +-
8 files changed, 272 insertions(+), 94 deletions(-)
diff --git a/fe/src/main/java/org/apache/doris/alter/Alter.java
b/fe/src/main/java/org/apache/doris/alter/Alter.java
index 9c2a008..b21b620 100644
--- a/fe/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/src/main/java/org/apache/doris/alter/Alter.java
@@ -275,8 +275,9 @@ public class Alter {
if (olapTable.getState() != OlapTableState.NORMAL) {
throw new DdlException("Table[" + table.getName() + "]'s state
is not NORMAL. Do not allow doing ALTER ops");
}
-
- if (needTableStable) {
+
+ // schema change job will wait until table become stable
+ if (needTableStable && !hasSchemaChange &&
!hasAddMaterializedView) {
// check if all tablets are healthy, and no tablet is in
tablet scheduler
boolean isStable =
olapTable.isStable(Catalog.getCurrentSystemInfo(),
Catalog.getCurrentCatalog().getTabletScheduler(),
diff --git a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java
b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java
index 2b4cda7..880814d 100644
--- a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -168,8 +168,17 @@ public class RollupJobV2 extends AlterJobV2 {
if (tbl == null) {
throw new AlterCancelException("Table " + tableId + " does not
exist");
}
- Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
+ boolean isStable = tbl.isStable(Catalog.getCurrentSystemInfo(),
+ Catalog.getCurrentCatalog().getTabletScheduler(),
+ db.getClusterName());
+ if (!isStable) {
+ errMsg = "table is unstable";
+ LOG.warn("doing rollup job: " + jobId + " while table is not
stable.");
+ return;
+ }
+
+ Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
for (Map.Entry<Long, MaterializedIndex> entry :
this.partitionIdToRollupIndex.entrySet()) {
long partitionId = entry.getKey();
Partition partition = tbl.getPartition(partitionId);
diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index fbe36e5..c8b3109 100644
--- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -1310,12 +1310,12 @@ public class SchemaChangeHandler extends AlterHandler {
public void process(List<AlterClause> alterClauses, String clusterName,
Database db, OlapTable olapTable)
throws UserException {
// index id -> index schema
- Map<Long, LinkedList<Column>> indexSchemaMap = new HashMap<Long,
LinkedList<Column>>();
+ Map<Long, LinkedList<Column>> indexSchemaMap = new HashMap<>();
for (Map.Entry<Long, List<Column>> entry :
olapTable.getIndexIdToSchema().entrySet()) {
- indexSchemaMap.put(entry.getKey(), new
LinkedList<Column>(entry.getValue()));
+ indexSchemaMap.put(entry.getKey(), new
LinkedList<>(entry.getValue()));
}
List<Index> newIndexes = olapTable.getCopiedIndexes();
- Map<String, String> propertyMap = new HashMap<String, String>();
+ Map<String, String> propertyMap = new HashMap<>();
for (AlterClause alterClause : alterClauses) {
// get properties
Map<String, String> properties = alterClause.getProperties();
diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index a8fdc7b..056a80e 100644
--- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -184,13 +184,23 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
totalReplicaNum += tablet.getReplicas().size();
}
}
- MarkedCountDownLatch<Long, Long> countDownLatch = new
MarkedCountDownLatch<Long, Long>(totalReplicaNum);
+ MarkedCountDownLatch<Long, Long> countDownLatch = new
MarkedCountDownLatch<>(totalReplicaNum);
db.readLock();
try {
OlapTable tbl = (OlapTable) db.getTable(tableId);
if (tbl == null) {
throw new AlterCancelException("Table " + tableId + " does not
exist");
- }
+ }
+
+ boolean isStable = tbl.isStable(Catalog.getCurrentSystemInfo(),
+ Catalog.getCurrentCatalog().getTabletScheduler(),
+ db.getClusterName());
+ if (!isStable) {
+ errMsg = "table is unstable";
+ LOG.warn("doing schema change job: " + jobId + " while table
is not stable.");
+ return;
+ }
+
Preconditions.checkState(tbl.getState() ==
OlapTableState.SCHEMA_CHANGE);
for (long partitionId : partitionIndexMap.rowKeySet()) {
Partition partition = tbl.getPartition(partitionId);
diff --git a/fe/src/main/java/org/apache/doris/clone/TabletChecker.java
b/fe/src/main/java/org/apache/doris/clone/TabletChecker.java
index bae2438..cba2d8a 100644
--- a/fe/src/main/java/org/apache/doris/clone/TabletChecker.java
+++ b/fe/src/main/java/org/apache/doris/clone/TabletChecker.java
@@ -100,6 +100,18 @@ public class TabletChecker extends MasterDaemon {
}
}
+ public static class RepairTabletInfo {
+ public long dbId;
+ public long tblId;
+ public List<Long> partIds;
+
+ public RepairTabletInfo(Long dbId, Long tblId, List<Long> partIds) {
+ this.dbId = dbId;
+ this.tblId = tblId;
+ this.partIds = partIds;
+ }
+ }
+
public TabletChecker(Catalog catalog, SystemInfoService infoService,
TabletScheduler tabletScheduler,
TabletSchedulerStat stat) {
super("tablet checker", CHECK_INTERVAL_MS);
@@ -109,42 +121,42 @@ public class TabletChecker extends MasterDaemon {
this.stat = stat;
}
- public void addPrios(long dbId, long tblId, List<Long> partitionIds, long
timeoutMs) {
- Preconditions.checkArgument(!partitionIds.isEmpty());
+ private void addPrios(RepairTabletInfo repairTabletInfo, long timeoutMs) {
+ Preconditions.checkArgument(!repairTabletInfo.partIds.isEmpty());
long currentTime = System.currentTimeMillis();
synchronized (prios) {
- Set<PrioPart> parts = prios.get(dbId, tblId);
+ Set<PrioPart> parts = prios.get(repairTabletInfo.dbId,
repairTabletInfo.tblId);
if (parts == null) {
parts = Sets.newHashSet();
- prios.put(dbId, tblId, parts);
+ prios.put(repairTabletInfo.dbId, repairTabletInfo.tblId,
parts);
}
- for (long partId : partitionIds) {
+ for (long partId : repairTabletInfo.partIds) {
PrioPart prioPart = new PrioPart(partId, currentTime,
timeoutMs);
parts.add(prioPart);
}
}
// we also need to change the priority of tablets which are already in
- tabletScheduler.changeTabletsPriorityToVeryHigh(dbId, tblId,
partitionIds);
+ tabletScheduler.changeTabletsPriorityToVeryHigh(repairTabletInfo.dbId,
repairTabletInfo.tblId, repairTabletInfo.partIds);
}
- private void removePrios(long dbId, long tblId, List<Long> partitionIds) {
- Preconditions.checkArgument(!partitionIds.isEmpty());
+ private void removePrios(RepairTabletInfo repairTabletInfo) {
+ Preconditions.checkArgument(!repairTabletInfo.partIds.isEmpty());
synchronized (prios) {
- Map<Long, Set<PrioPart>> tblMap = prios.row(dbId);
+ Map<Long, Set<PrioPart>> tblMap = prios.row(repairTabletInfo.dbId);
if (tblMap == null) {
return;
}
- Set<PrioPart> parts = tblMap.get(tblId);
+ Set<PrioPart> parts = tblMap.get(repairTabletInfo.tblId);
if (parts == null) {
return;
}
- for (long partId : partitionIds) {
+ for (long partId : repairTabletInfo.partIds) {
parts.remove(new PrioPart(partId, -1, -1));
}
if (parts.isEmpty()) {
- tblMap.remove(tblId);
+ tblMap.remove(repairTabletInfo.tblId);
}
}
@@ -271,7 +283,8 @@ public class TabletChecker extends MasterDaemon {
// priorities.
LOG.debug("partition is healthy, remove from
prios: {}-{}-{}",
db.getId(), olapTbl.getId(),
partition.getId());
- removePrios(db.getId(), olapTbl.getId(),
Lists.newArrayList(partition.getId()));
+ removePrios(new RepairTabletInfo(db.getId(),
+ olapTbl.getId(),
Lists.newArrayList(partition.getId())));
}
} // partitions
} // tables
@@ -356,53 +369,53 @@ public class TabletChecker extends MasterDaemon {
* when being scheduled.
*/
public void repairTable(AdminRepairTableStmt stmt) throws DdlException {
- Catalog catalog = Catalog.getCurrentCatalog();
- Database db = catalog.getDb(stmt.getDbName());
- if (db == null) {
- throw new DdlException("Database " + stmt.getDbName() + " does not
exist");
- }
+ RepairTabletInfo repairTabletInfo =
getRepairTabletInfo(stmt.getDbName(), stmt.getTblName(), stmt.getPartitions());
+ addPrios(repairTabletInfo, stmt.getTimeoutS());
+ LOG.info("repair database: {}, table: {}, partition: {}",
repairTabletInfo.dbId, repairTabletInfo.tblId, repairTabletInfo.partIds);
+ }
- long dbId = db.getId();
- long tblId = -1;
- List<Long> partIds = Lists.newArrayList();
- db.readLock();
- try {
- Table tbl = db.getTable(stmt.getTblName());
- if (tbl == null || tbl.getType() != TableType.OLAP) {
- throw new DdlException("Table does not exist or is not OLAP
table: " + stmt.getTblName());
+ /*
+ * handle ADMIN CANCEL REPAIR TABLE stmt send by user.
+ * This operation will remove the specified partitions from 'prios'
+ */
+ public void cancelRepairTable(AdminCancelRepairTableStmt stmt) throws
DdlException {
+ RepairTabletInfo repairTabletInfo =
getRepairTabletInfo(stmt.getDbName(), stmt.getTblName(), stmt.getPartitions());
+ removePrios(repairTabletInfo);
+ LOG.info("cancel repair database: {}, table: {}, partition: {}",
repairTabletInfo.dbId, repairTabletInfo.tblId, repairTabletInfo.partIds);
+ }
+
+ public int getPrioPartitionNum() {
+ int count = 0;
+ synchronized (prios) {
+ for (Set<PrioPart> set : prios.values()) {
+ count += set.size();
}
+ }
+ return count;
+ }
- tblId = tbl.getId();
- OlapTable olapTable = (OlapTable) tbl;
- if (stmt.getPartitions().isEmpty()) {
- partIds = olapTable.getPartitions().stream().map(p ->
p.getId()).collect(Collectors.toList());
- } else {
- for (String partName : stmt.getPartitions()) {
- Partition partition = olapTable.getPartition(partName);
- if (partition == null) {
- throw new DdlException("Partition does not exist: " +
partName);
- }
- partIds.add(partition.getId());
+ public List<List<String>> getPriosInfo() {
+ List<List<String>> infos = Lists.newArrayList();
+ synchronized (prios) {
+ for (Cell<Long, Long, Set<PrioPart>> cell : prios.cellSet()) {
+ for (PrioPart part : cell.getValue()) {
+ List<String> row = Lists.newArrayList();
+ row.add(cell.getRowKey().toString());
+ row.add(cell.getColumnKey().toString());
+ row.add(String.valueOf(part.partId));
+ row.add(String.valueOf(part.timeoutMs -
(System.currentTimeMillis() - part.addTime)));
+ infos.add(row);
}
}
- } finally {
- db.readUnlock();
}
-
- Preconditions.checkState(tblId != -1);
- addPrios(dbId, tblId, partIds, stmt.getTimeoutS() * 1000);
- LOG.info("repair database: {}, table: {}, partition: {}", dbId, tblId,
partIds);
+ return infos;
}
- /*
- * handle ADMIN CANCEL REPAIR TABLE stmt send by user.
- * This operation will remove the specified partitions from 'prios'
- */
- public void cancelRepairTable(AdminCancelRepairTableStmt stmt) throws
DdlException {
+ public static RepairTabletInfo getRepairTabletInfo(String dbName, String
tblName, List<String> partitions) throws DdlException {
Catalog catalog = Catalog.getCurrentCatalog();
- Database db = catalog.getDb(stmt.getDbName());
+ Database db = catalog.getDb(dbName);
if (db == null) {
- throw new DdlException("Database " + stmt.getDbName() + " does not
exist");
+ throw new DdlException("Database " + dbName + " does not exist");
}
long dbId = db.getId();
@@ -410,17 +423,18 @@ public class TabletChecker extends MasterDaemon {
List<Long> partIds = Lists.newArrayList();
db.readLock();
try {
- Table tbl = db.getTable(stmt.getTblName());
+ Table tbl = db.getTable(tblName);
if (tbl == null || tbl.getType() != TableType.OLAP) {
- throw new DdlException("Table does not exist or is not OLAP
table: " + stmt.getTblName());
+ throw new DdlException("Table does not exist or is not OLAP
table: " + tblName);
}
tblId = tbl.getId();
OlapTable olapTable = (OlapTable) tbl;
- if (stmt.getPartitions().isEmpty()) {
- partIds = olapTable.getPartitions().stream().map(p ->
p.getId()).collect(Collectors.toList());
+
+ if (partitions == null || partitions.isEmpty()) {
+ partIds =
olapTable.getPartitions().stream().map(Partition::getId).collect(Collectors.toList());
} else {
- for (String partName : stmt.getPartitions()) {
+ for (String partName : partitions) {
Partition partition = olapTable.getPartition(partName);
if (partition == null) {
throw new DdlException("Partition does not exist: " +
partName);
@@ -433,34 +447,7 @@ public class TabletChecker extends MasterDaemon {
}
Preconditions.checkState(tblId != -1);
- removePrios(dbId, tblId, partIds);
- LOG.info("cancel repair database: {}, table: {}, partition: {}", dbId,
tblId, partIds);
- }
-
- public int getPrioPartitionNum() {
- int count = 0;
- synchronized (prios) {
- for (Set<PrioPart> set : prios.values()) {
- count += set.size();
- }
- }
- return count;
- }
- public List<List<String>> getPriosInfo() {
- List<List<String>> infos = Lists.newArrayList();
- synchronized (prios) {
- for (Cell<Long, Long, Set<PrioPart>> cell : prios.cellSet()) {
- for (PrioPart part : cell.getValue()) {
- List<String> row = Lists.newArrayList();
- row.add(cell.getRowKey().toString());
- row.add(cell.getColumnKey().toString());
- row.add(String.valueOf(part.partId));
- row.add(String.valueOf(part.timeoutMs -
(System.currentTimeMillis() - part.addTime)));
- infos.add(row);
- }
- }
- }
- return infos;
+ return new RepairTabletInfo(dbId, tblId, partIds);
}
}
diff --git a/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java
b/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java
index 8f88bc0..d79b8c5 100644
--- a/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java
+++ b/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java
@@ -58,6 +58,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import static org.junit.Assert.assertEquals;
+
public class RollupJobV2Test {
private static FakeTransactionIDGenerator fakeTransactionIDGenerator;
private static GlobalTransactionMgr masterTransMgr;
@@ -69,11 +71,13 @@ public class RollupJobV2Test {
private static Analyzer analyzer;
private static AddRollupClause clause;
- FakeEditLog fakeEditLog;
+ private FakeCatalog fakeCatalog;
+ private FakeEditLog fakeEditLog;
@Before
public void setUp() throws InstantiationException, IllegalAccessException,
IllegalArgumentException,
InvocationTargetException, NoSuchMethodException,
SecurityException, AnalysisException {
+ fakeCatalog = new FakeCatalog();
fakeEditLog = new FakeEditLog();
fakeTransactionIDGenerator = new FakeTransactionIDGenerator();
masterCatalog = CatalogTestUtil.createTestCatalog();
@@ -110,7 +114,9 @@ public class RollupJobV2Test {
@Test
public void testAddSchemaChange() throws UserException {
+ fakeCatalog = new FakeCatalog();
fakeEditLog = new FakeEditLog();
+ FakeCatalog.setCatalog(masterCatalog);
MaterializedViewHandler materializedViewHandler =
Catalog.getInstance().getRollupHandler();
ArrayList<AlterClause> alterClauses = new ArrayList<>();
alterClauses.add(clause);
@@ -125,7 +131,9 @@ public class RollupJobV2Test {
// start a schema change, then finished
@Test
public void testSchemaChange1() throws Exception {
+ fakeCatalog = new FakeCatalog();
fakeEditLog = new FakeEditLog();
+ FakeCatalog.setCatalog(masterCatalog);
MaterializedViewHandler materializedViewHandler =
Catalog.getInstance().getRollupHandler();
// add a rollup job
@@ -225,4 +233,82 @@ public class RollupJobV2Test {
*/
}
+ @Test
+ public void testSchemaChangeWhileTabletNotStable() throws Exception {
+ fakeCatalog = new FakeCatalog();
+ fakeEditLog = new FakeEditLog();
+ FakeCatalog.setCatalog(masterCatalog);
+ MaterializedViewHandler materializedViewHandler =
Catalog.getInstance().getRollupHandler();
+
+ // add a rollup job
+ ArrayList<AlterClause> alterClauses = new ArrayList<>();
+ alterClauses.add(clause);
+ Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1);
+ OlapTable olapTable = (OlapTable)
db.getTable(CatalogTestUtil.testTableId1);
+ Partition testPartition =
olapTable.getPartition(CatalogTestUtil.testPartitionId1);
+ materializedViewHandler.process(alterClauses, db.getClusterName(), db,
olapTable);
+ Map<Long, AlterJobV2> alterJobsV2 =
materializedViewHandler.getAlterJobsV2();
+ Assert.assertEquals(1, alterJobsV2.size());
+ RollupJobV2 rollupJob = (RollupJobV2)
alterJobsV2.values().stream().findAny().get();
+
+ MaterializedIndex baseIndex = testPartition.getBaseIndex();
+ assertEquals(MaterializedIndex.IndexState.NORMAL,
baseIndex.getState());
+ assertEquals(Partition.PartitionState.NORMAL,
testPartition.getState());
+ assertEquals(OlapTableState.ROLLUP, olapTable.getState());
+
+ Tablet baseTablet = baseIndex.getTablets().get(0);
+ List<Replica> replicas = baseTablet.getReplicas();
+ Replica replica1 = replicas.get(0);
+ Replica replica2 = replicas.get(1);
+ Replica replica3 = replicas.get(2);
+
+ assertEquals(CatalogTestUtil.testStartVersion, replica1.getVersion());
+ assertEquals(CatalogTestUtil.testStartVersion, replica2.getVersion());
+ assertEquals(CatalogTestUtil.testStartVersion, replica3.getVersion());
+ assertEquals(-1, replica1.getLastFailedVersion());
+ assertEquals(-1, replica2.getLastFailedVersion());
+ assertEquals(-1, replica3.getLastFailedVersion());
+ assertEquals(CatalogTestUtil.testStartVersion,
replica1.getLastSuccessVersion());
+ assertEquals(CatalogTestUtil.testStartVersion,
replica2.getLastSuccessVersion());
+ assertEquals(CatalogTestUtil.testStartVersion,
replica3.getLastSuccessVersion());
+
+ // runPendingJob
+ replica1.setState(Replica.ReplicaState.DECOMMISSION);
+ materializedViewHandler.runAfterCatalogReady();
+ Assert.assertEquals(JobState.PENDING, rollupJob.getJobState());
+
+ // table is stable, runPendingJob again
+ replica1.setState(Replica.ReplicaState.NORMAL);
+ materializedViewHandler.runAfterCatalogReady();
+ Assert.assertEquals(JobState.WAITING_TXN, rollupJob.getJobState());
+ Assert.assertEquals(2,
testPartition.getMaterializedIndices(IndexExtState.ALL).size());
+ Assert.assertEquals(1,
testPartition.getMaterializedIndices(IndexExtState.VISIBLE).size());
+ Assert.assertEquals(1,
testPartition.getMaterializedIndices(IndexExtState.SHADOW).size());
+
+ // runWaitingTxnJob
+ materializedViewHandler.runAfterCatalogReady();
+ Assert.assertEquals(JobState.RUNNING, rollupJob.getJobState());
+
+ // runWaitingTxnJob, task not finished
+ materializedViewHandler.runAfterCatalogReady();
+ Assert.assertEquals(JobState.RUNNING, rollupJob.getJobState());
+
+ // finish all tasks
+ List<AgentTask> tasks = AgentTaskQueue.getTask(TTaskType.ALTER);
+ Assert.assertEquals(3, tasks.size());
+ for (AgentTask agentTask : tasks) {
+ agentTask.setFinished(true);
+ }
+ MaterializedIndex shadowIndex =
testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0);
+ for (Tablet shadowTablet : shadowIndex.getTablets()) {
+ for (Replica shadowReplica : shadowTablet.getReplicas()) {
+
shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(),
+ testPartition.getVisibleVersionHash(),
shadowReplica.getDataSize(),
+ shadowReplica.getRowCount());
+ }
+ }
+
+ materializedViewHandler.runAfterCatalogReady();
+ Assert.assertEquals(JobState.FINISHED, rollupJob.getJobState());
+ }
}
diff --git a/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
b/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
index d462eaa..91a745b 100644
--- a/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
+++ b/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
@@ -207,6 +207,88 @@ public class SchemaChangeJobV2Test {
}
@Test
+ public void testSchemaChangeWhileTabletNotStable() throws Exception {
+ fakeCatalog = new FakeCatalog();
+ fakeEditLog = new FakeEditLog();
+ FakeCatalog.setCatalog(masterCatalog);
+ SchemaChangeHandler schemaChangeHandler =
Catalog.getInstance().getSchemaChangeHandler();
+
+ // add a schema change job
+ ArrayList<AlterClause> alterClauses = new ArrayList<>();
+ alterClauses.add(addColumnClause);
+ Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1);
+ OlapTable olapTable = (OlapTable)
db.getTable(CatalogTestUtil.testTableId1);
+ Partition testPartition =
olapTable.getPartition(CatalogTestUtil.testPartitionId1);
+ schemaChangeHandler.process(alterClauses, "default_cluster", db,
olapTable);
+ Map<Long, AlterJobV2> alterJobsV2 =
schemaChangeHandler.getAlterJobsV2();
+ Assert.assertEquals(1, alterJobsV2.size());
+ SchemaChangeJobV2 schemaChangeJob = (SchemaChangeJobV2)
alterJobsV2.values().stream().findAny().get();
+
+ MaterializedIndex baseIndex = testPartition.getBaseIndex();
+ assertEquals(IndexState.NORMAL, baseIndex.getState());
+ assertEquals(PartitionState.NORMAL, testPartition.getState());
+ assertEquals(OlapTableState.SCHEMA_CHANGE, olapTable.getState());
+
+ Tablet baseTablet = baseIndex.getTablets().get(0);
+ List<Replica> replicas = baseTablet.getReplicas();
+ Replica replica1 = replicas.get(0);
+ Replica replica2 = replicas.get(1);
+ Replica replica3 = replicas.get(2);
+
+ assertEquals(CatalogTestUtil.testStartVersion, replica1.getVersion());
+ assertEquals(CatalogTestUtil.testStartVersion, replica2.getVersion());
+ assertEquals(CatalogTestUtil.testStartVersion, replica3.getVersion());
+ assertEquals(-1, replica1.getLastFailedVersion());
+ assertEquals(-1, replica2.getLastFailedVersion());
+ assertEquals(-1, replica3.getLastFailedVersion());
+ assertEquals(CatalogTestUtil.testStartVersion,
replica1.getLastSuccessVersion());
+ assertEquals(CatalogTestUtil.testStartVersion,
replica2.getLastSuccessVersion());
+ assertEquals(CatalogTestUtil.testStartVersion,
replica3.getLastSuccessVersion());
+
+ // runPendingJob
+ replica1.setState(Replica.ReplicaState.DECOMMISSION);
+ schemaChangeHandler.runAfterCatalogReady();
+ Assert.assertEquals(JobState.PENDING, schemaChangeJob.getJobState());
+
+ // table is stable runPendingJob again
+ replica1.setState(Replica.ReplicaState.NORMAL);
+ schemaChangeHandler.runAfterCatalogReady();
+ Assert.assertEquals(JobState.WAITING_TXN,
schemaChangeJob.getJobState());
+ Assert.assertEquals(2,
testPartition.getMaterializedIndices(IndexExtState.ALL).size());
+ Assert.assertEquals(1,
testPartition.getMaterializedIndices(IndexExtState.VISIBLE).size());
+ Assert.assertEquals(1,
testPartition.getMaterializedIndices(IndexExtState.SHADOW).size());
+
+ // runWaitingTxnJob
+ schemaChangeHandler.runAfterCatalogReady();
+ Assert.assertEquals(JobState.RUNNING, schemaChangeJob.getJobState());
+
+ // runWaitingTxnJob, task not finished
+ schemaChangeHandler.runAfterCatalogReady();
+ Assert.assertEquals(JobState.RUNNING, schemaChangeJob.getJobState());
+
+ // runRunningJob
+ schemaChangeHandler.runAfterCatalogReady();
+ // task not finished, still running
+ Assert.assertEquals(JobState.RUNNING, schemaChangeJob.getJobState());
+
+ // finish alter tasks
+ List<AgentTask> tasks = AgentTaskQueue.getTask(TTaskType.ALTER);
+ Assert.assertEquals(3, tasks.size());
+ for (AgentTask agentTask : tasks) {
+ agentTask.setFinished(true);
+ }
+ MaterializedIndex shadowIndex =
testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0);
+ for (Tablet shadowTablet : shadowIndex.getTablets()) {
+ for (Replica shadowReplica : shadowTablet.getReplicas()) {
+
shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(),
testPartition.getVisibleVersionHash(), shadowReplica.getDataSize(),
shadowReplica.getRowCount());
+ }
+ }
+
+ schemaChangeHandler.runAfterCatalogReady();
+ Assert.assertEquals(JobState.FINISHED, schemaChangeJob.getJobState());
+ }
+
+ @Test
public void testModifyDynamicPartitionNormal() throws UserException {
fakeCatalog = new FakeCatalog();
fakeEditLog = new FakeEditLog();
diff --git a/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
b/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
index 172f860..7a1276b 100644
--- a/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
+++ b/fe/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
@@ -89,8 +89,11 @@ public class CatalogTestUtil {
catalog.setEditLog(new EditLog("name"));
FakeCatalog.setCatalog(catalog);
Backend backend1 = createBackend(testBackendId1, "host1", 123, 124,
125);
- Backend backend2 = createBackend(testBackendId2, "host1", 123, 124,
125);
- Backend backend3 = createBackend(testBackendId3, "host1", 123, 124,
125);
+ Backend backend2 = createBackend(testBackendId2, "host2", 123, 124,
125);
+ Backend backend3 = createBackend(testBackendId3, "host3", 123, 124,
125);
+ backend1.setOwnerClusterName(SystemInfoService.DEFAULT_CLUSTER);
+ backend2.setOwnerClusterName(SystemInfoService.DEFAULT_CLUSTER);
+ backend3.setOwnerClusterName(SystemInfoService.DEFAULT_CLUSTER);
Catalog.getCurrentSystemInfo().addBackend(backend1);
Catalog.getCurrentSystemInfo().addBackend(backend2);
Catalog.getCurrentSystemInfo().addBackend(backend3);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]