This is an automated email from the ASF dual-hosted git repository.
adonisling pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8df4a94826 [fix](MTMV) Tasks leak when dropping job (#17984)
8df4a94826 is described below
commit 8df4a948269398faac679893634e06afbccbb52b
Author: huangzhaowei <[email protected]>
AuthorDate: Tue Mar 21 23:22:17 2023 +0800
[fix](MTMV) Tasks leak when dropping job (#17984)
1. Divide MTMV regression tests into 4 suites
2. Try to remove tasks which were killed by dropping job actions in running
map.
---
.../java/org/apache/doris/mtmv/MTMVJobManager.java | 2 +-
.../org/apache/doris/mtmv/MTMVTaskManager.java | 28 ++++++---
.../org/apache/doris/mtmv/MTMVJobManagerTest.java | 8 +--
regression-test/data/mtmv_p0/test_refresh_mtmv.out | 6 ++
...t_create_mtmv.groovy => test_alter_mtmv.groovy} | 61 ++----------------
...te_mtmv.groovy => test_create_both_mtmv.groovy} | 73 +++-------------------
.../suites/mtmv_p0/test_create_mtmv.groovy | 65 +------------------
...create_mtmv.groovy => test_refresh_mtmv.groovy} | 53 ++--------------
8 files changed, 53 insertions(+), 243 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
index a6f10128a5..c6ff81c082 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
@@ -157,7 +157,7 @@ public class MTMVJobManager {
Metric.MetricUnit.NOUNIT, "Total task number of mtmv.") {
@Override
public Integer getValue() {
- return getTaskManager().getAllHistory().size();
+ return getTaskManager().getHistoryTasks().size();
}
};
totalTask.addLabel(new MetricLabel("type", "TOTAL-TASK"));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
index 2de94e5201..cbac71b9bb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
@@ -69,7 +69,7 @@ public class MTMVTaskManager {
private final ReentrantLock reentrantLock = new ReentrantLock(true);
// keep track of all the completed tasks
- private final Deque<MTMVTask> historyQueue =
Queues.newLinkedBlockingDeque();
+ private final Deque<MTMVTask> historyTasks =
Queues.newLinkedBlockingDeque();
private ScheduledExecutorService taskScheduler =
Executors.newScheduledThreadPool(1);
@@ -276,11 +276,11 @@ public class MTMVTaskManager {
}
private void addHistory(MTMVTask task) {
- historyQueue.addFirst(task);
+ historyTasks.addFirst(task);
}
- public Deque<MTMVTask> getAllHistory() {
- return historyQueue;
+ public Deque<MTMVTask> getHistoryTasks() {
+ return historyTasks;
}
public List<MTMVTask> showAllTasks() {
@@ -295,7 +295,7 @@ public class MTMVTaskManager {
}
taskList.addAll(
getRunningTaskMap().values().stream().map(MTMVTaskExecutor::getTask).collect(Collectors.toList()));
- taskList.addAll(getAllHistory());
+ taskList.addAll(getHistoryTasks());
} else {
for (Queue<MTMVTaskExecutor> pTaskQueue :
getPendingTaskMap().values()) {
taskList.addAll(
@@ -305,7 +305,7 @@ public class MTMVTaskManager {
taskList.addAll(getRunningTaskMap().values().stream().map(MTMVTaskExecutor::getTask)
.filter(u ->
u.getDBName().equals(dbName)).collect(Collectors.toList()));
taskList.addAll(
- getAllHistory().stream().filter(u ->
u.getDBName().equals(dbName)).collect(Collectors.toList()));
+ getHistoryTasks().stream().filter(u ->
u.getDBName().equals(dbName)).collect(Collectors.toList()));
}
return taskList.stream().sorted().collect(Collectors.toList());
@@ -417,7 +417,7 @@ public class MTMVTaskManager {
return;
}
try {
- Deque<MTMVTask> taskHistory = getAllHistory();
+ List<MTMVTask> taskHistory = showAllTasks();
for (MTMVTask task : taskHistory) {
if (task.getJobName().equals(jobName)) {
clearTasks.add(task.getTaskId());
@@ -438,7 +438,7 @@ public class MTMVTaskManager {
return;
}
try {
- Deque<MTMVTask> taskHistory = getAllHistory();
+ Deque<MTMVTask> taskHistory = getHistoryTasks();
for (MTMVTask task : taskHistory) {
long expireTime = task.getExpireTime();
if (currentTime > expireTime) {
@@ -460,7 +460,17 @@ public class MTMVTaskManager {
}
try {
Set<String> taskSet = new HashSet<>(taskIds);
- getAllHistory().removeIf(mtmvTask ->
taskSet.contains(mtmvTask.getTaskId()));
+ // Pending tasks will be clear directly. So we don't drop it again
here.
+ // Check the running task since the task was killed but was not
move to the history queue.
+ for (long key : runningTaskMap.keySet()) {
+ MTMVTaskExecutor executor = runningTaskMap.get(key);
+ // runningTaskMap may be removed in the runningIterator
+ if (executor != null &&
taskSet.contains(executor.getTask().getTaskId())) {
+ runningTaskMap.remove(key);
+ }
+ }
+ // Try to remove history tasks.
+ getHistoryTasks().removeIf(mtmvTask ->
taskSet.contains(mtmvTask.getTaskId()));
if (!isReplay) {
Env.getCurrentEnv().getEditLog().logDropMTMVTasks(taskIds);
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java
index 55cf3dceb0..9c5b98df11 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java
@@ -58,15 +58,15 @@ public class MTMVJobManagerTest extends TestWithFeService {
public void testSchedulerJob() throws DdlException, InterruptedException {
MTMVJobManager jobManager = new MTMVJobManager();
jobManager.start();
-
Assertions.assertTrue(jobManager.getTaskManager().getAllHistory().isEmpty());
+
Assertions.assertTrue(jobManager.getTaskManager().getHistoryTasks().isEmpty());
MTMVJob job = MTMVUtilsTest.createSchedulerJob();
jobManager.createJob(job, false);
Assertions.assertEquals(1,
jobManager.showJobs(MTMVUtilsTest.dbName).size());
- while (jobManager.getTaskManager().getAllHistory().isEmpty()) {
+ while (jobManager.getTaskManager().getHistoryTasks().isEmpty()) {
Thread.sleep(1000L);
System.out.println("Loop once");
}
-
Assertions.assertTrue(jobManager.getTaskManager().getAllHistory().size() > 0);
+
Assertions.assertTrue(jobManager.getTaskManager().getHistoryTasks().size() > 0);
}
@Test
@@ -83,7 +83,7 @@ public class MTMVJobManagerTest extends TestWithFeService {
System.out.println("Loop once");
}
- Assertions.assertEquals(1,
jobManager.getTaskManager().getAllHistory().size());
+ Assertions.assertEquals(1,
jobManager.getTaskManager().getHistoryTasks().size());
Assertions.assertEquals(1,
jobManager.getTaskManager().showAllTasks().size());
Assertions.assertEquals(1,
jobManager.getTaskManager().showTasks(MTMVUtilsTest.dbName).size());
Assertions.assertEquals(1,
diff --git a/regression-test/data/mtmv_p0/test_refresh_mtmv.out
b/regression-test/data/mtmv_p0/test_refresh_mtmv.out
new file mode 100644
index 0000000000..75d5531799
--- /dev/null
+++ b/regression-test/data/mtmv_p0/test_refresh_mtmv.out
@@ -0,0 +1,6 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select --
+clz 200
+lisi 300
+zhangsang 200
+
diff --git a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy
b/regression-test/suites/mtmv_p0/test_alter_mtmv.groovy
similarity index 63%
copy from regression-test/suites/mtmv_p0/test_create_mtmv.groovy
copy to regression-test/suites/mtmv_p0/test_alter_mtmv.groovy
index 476ab32a52..631e4997f8 100644
--- a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy
+++ b/regression-test/suites/mtmv_p0/test_alter_mtmv.groovy
@@ -15,10 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-suite("test_create_mtmv") {
- def tableName = "t_user"
- def tableNamePv = "t_user_pv"
- def mvName = "multi_mv"
+suite("test_alter_mtmv") {
+ def tableName = "t_test_alter_mtmv_user"
+ def tableNamePv = "t_test_alter_mtmv_pv"
+ def mvName = "multi_mv_test_alter_mtmv"
sql """
ADMIN SET FRONTEND CONFIG("enable_mtmv_scheduler_framework"="true");
"""
@@ -68,6 +68,7 @@ suite("test_create_mtmv") {
SELECT ${tableName}.username, ${tableNamePv}.pv FROM ${tableName},
${tableNamePv} WHERE ${tableName}.id=${tableNamePv}.id;
"""
+ // waiting the task to be finished.
def show_task_meta = sql_meta "SHOW MTMV TASK ON ${mvName}"
def index = show_task_meta.indexOf(['State', 'CHAR'])
def query = "SHOW MTMV TASK ON ${mvName}"
@@ -82,55 +83,6 @@ suite("test_create_mtmv") {
Thread.sleep(1000);
} while (state.equals('PENDING') || state.equals('RUNNING'))
- assertEquals 'SUCCESS', state, show_task_result.last().toString()
- order_qt_select "SELECT * FROM ${mvName}"
-
- sql """
- DROP MATERIALIZED VIEW ${mvName}
- """
-
- // test only one job created when build IMMEDIATE and start time is before
now.
- sql """
- CREATE MATERIALIZED VIEW ${mvName}
- BUILD IMMEDIATE REFRESH COMPLETE
- start with "2022-11-03 00:00:00" next 1 DAY
- KEY(username)
- DISTRIBUTED BY HASH (username) buckets 1
- PROPERTIES ('replication_num' = '1')
- AS
- SELECT ${tableName}.username, ${tableNamePv}.pv FROM ${tableName},
${tableNamePv} WHERE ${tableName}.id=${tableNamePv}.id;
- """
- // wait task to be finished to avoid task leak in suite.
- state = "PENDING"
- do {
- show_task_result = sql "${query}"
- if (!show_task_result.isEmpty()) {
- state = show_task_result.last().get(index)
- }
- println "The state of ${query} is ${state}"
- Thread.sleep(1000);
- } while (state.equals('PENDING') || state.equals('RUNNING'))
-
- def show_job_result = sql "SHOW MTMV JOB ON ${mvName}"
- assertEquals 1, show_job_result.size()
-
- // test REFRESH make sure only define one mv and already run a task.
- sql """
- REFRESH MATERIALIZED VIEW ${mvName} COMPLETE
- """
- state = "PENDING"
- do {
- show_task_result = sql "${query}"
- if (!show_task_result.isEmpty()) {
- state = show_task_result.last().get(index)
- }
- println "The state of ${query} is ${state}"
- Thread.sleep(1000);
- } while (state.equals('PENDING') || state.equals('RUNNING'))
-
- assertEquals 'SUCCESS', state, show_task_result.last().toString()
- assertEquals 2, show_task_result.size()
-
// test alter mtmv
sql """
alter MATERIALIZED VIEW ${mvName} REFRESH COMPLETE start with
"2022-11-03 00:00:00" next 2 DAY
@@ -139,7 +91,7 @@ suite("test_create_mtmv") {
def scheduleIndex = show_job_meta.indexOf(['Schedule', 'CHAR'])
show_job_result = sql "SHOW MTMV JOB ON ${mvName}"
- assertEquals 1, show_job_result.size()
+ assertEquals 1, show_job_result.size(), show_job_result.toString()
assertEquals 'START 2022-11-03T00:00 EVERY(2 DAYS)',
show_job_result.last().get(scheduleIndex).toString(),
show_job_result.last().toString()
@@ -147,4 +99,3 @@ suite("test_create_mtmv") {
DROP MATERIALIZED VIEW ${mvName}
"""
}
-
diff --git a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy
b/regression-test/suites/mtmv_p0/test_create_both_mtmv.groovy
similarity index 59%
copy from regression-test/suites/mtmv_p0/test_create_mtmv.groovy
copy to regression-test/suites/mtmv_p0/test_create_both_mtmv.groovy
index 476ab32a52..bf692de248 100644
--- a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy
+++ b/regression-test/suites/mtmv_p0/test_create_both_mtmv.groovy
@@ -15,10 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-suite("test_create_mtmv") {
- def tableName = "t_user"
- def tableNamePv = "t_user_pv"
- def mvName = "multi_mv"
+suite("test_create_both_mtmv") {
+ def tableName = "t_test_create_both_mtmv_user"
+ def tableNamePv = "t_test_create_both_mtmv_user_pv"
+ def mvName = "multi_mv_test_create_both_mtmv"
sql """
ADMIN SET FRONTEND CONFIG("enable_mtmv_scheduler_framework"="true");
"""
@@ -58,16 +58,18 @@ suite("test_create_mtmv") {
sql """drop materialized view if exists ${mvName}"""
+ // test only one job created when build IMMEDIATE and start time is before
now.
sql """
CREATE MATERIALIZED VIEW ${mvName}
BUILD IMMEDIATE REFRESH COMPLETE
+ start with "2022-11-03 00:00:00" next 1 DAY
KEY(username)
DISTRIBUTED BY HASH (username) buckets 1
PROPERTIES ('replication_num' = '1')
AS
SELECT ${tableName}.username, ${tableNamePv}.pv FROM ${tableName},
${tableNamePv} WHERE ${tableName}.id=${tableNamePv}.id;
"""
-
+ // wait task to be finished to avoid task leak in suite.
def show_task_meta = sql_meta "SHOW MTMV TASK ON ${mvName}"
def index = show_task_meta.indexOf(['State', 'CHAR'])
def query = "SHOW MTMV TASK ON ${mvName}"
@@ -82,69 +84,10 @@ suite("test_create_mtmv") {
Thread.sleep(1000);
} while (state.equals('PENDING') || state.equals('RUNNING'))
- assertEquals 'SUCCESS', state, show_task_result.last().toString()
- order_qt_select "SELECT * FROM ${mvName}"
-
- sql """
- DROP MATERIALIZED VIEW ${mvName}
- """
-
- // test only one job created when build IMMEDIATE and start time is before
now.
- sql """
- CREATE MATERIALIZED VIEW ${mvName}
- BUILD IMMEDIATE REFRESH COMPLETE
- start with "2022-11-03 00:00:00" next 1 DAY
- KEY(username)
- DISTRIBUTED BY HASH (username) buckets 1
- PROPERTIES ('replication_num' = '1')
- AS
- SELECT ${tableName}.username, ${tableNamePv}.pv FROM ${tableName},
${tableNamePv} WHERE ${tableName}.id=${tableNamePv}.id;
- """
- // wait task to be finished to avoid task leak in suite.
- state = "PENDING"
- do {
- show_task_result = sql "${query}"
- if (!show_task_result.isEmpty()) {
- state = show_task_result.last().get(index)
- }
- println "The state of ${query} is ${state}"
- Thread.sleep(1000);
- } while (state.equals('PENDING') || state.equals('RUNNING'))
-
def show_job_result = sql "SHOW MTMV JOB ON ${mvName}"
- assertEquals 1, show_job_result.size()
-
- // test REFRESH make sure only define one mv and already run a task.
- sql """
- REFRESH MATERIALIZED VIEW ${mvName} COMPLETE
- """
- state = "PENDING"
- do {
- show_task_result = sql "${query}"
- if (!show_task_result.isEmpty()) {
- state = show_task_result.last().get(index)
- }
- println "The state of ${query} is ${state}"
- Thread.sleep(1000);
- } while (state.equals('PENDING') || state.equals('RUNNING'))
-
- assertEquals 'SUCCESS', state, show_task_result.last().toString()
- assertEquals 2, show_task_result.size()
-
- // test alter mtmv
- sql """
- alter MATERIALIZED VIEW ${mvName} REFRESH COMPLETE start with
"2022-11-03 00:00:00" next 2 DAY
- """
- show_job_meta = sql_meta "SHOW MTMV JOB ON ${mvName}"
- def scheduleIndex = show_job_meta.indexOf(['Schedule', 'CHAR'])
-
- show_job_result = sql "SHOW MTMV JOB ON ${mvName}"
- assertEquals 1, show_job_result.size()
-
- assertEquals 'START 2022-11-03T00:00 EVERY(2 DAYS)',
show_job_result.last().get(scheduleIndex).toString(),
show_job_result.last().toString()
+ assertEquals 1, show_job_result.size(), show_job_result.toString()
sql """
DROP MATERIALIZED VIEW ${mvName}
"""
}
-
diff --git a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy
b/regression-test/suites/mtmv_p0/test_create_mtmv.groovy
index 476ab32a52..fa22595a1d 100644
--- a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy
+++ b/regression-test/suites/mtmv_p0/test_create_mtmv.groovy
@@ -16,9 +16,9 @@
// under the License.
suite("test_create_mtmv") {
- def tableName = "t_user"
- def tableNamePv = "t_user_pv"
- def mvName = "multi_mv"
+ def tableName = "t_test_create_mtmv_user"
+ def tableNamePv = "t_test_create_mtmv_user_pv"
+ def mvName = "multi_mv_test_create_mtmv"
sql """
ADMIN SET FRONTEND CONFIG("enable_mtmv_scheduler_framework"="true");
"""
@@ -88,63 +88,4 @@ suite("test_create_mtmv") {
sql """
DROP MATERIALIZED VIEW ${mvName}
"""
-
- // test only one job created when build IMMEDIATE and start time is before
now.
- sql """
- CREATE MATERIALIZED VIEW ${mvName}
- BUILD IMMEDIATE REFRESH COMPLETE
- start with "2022-11-03 00:00:00" next 1 DAY
- KEY(username)
- DISTRIBUTED BY HASH (username) buckets 1
- PROPERTIES ('replication_num' = '1')
- AS
- SELECT ${tableName}.username, ${tableNamePv}.pv FROM ${tableName},
${tableNamePv} WHERE ${tableName}.id=${tableNamePv}.id;
- """
- // wait task to be finished to avoid task leak in suite.
- state = "PENDING"
- do {
- show_task_result = sql "${query}"
- if (!show_task_result.isEmpty()) {
- state = show_task_result.last().get(index)
- }
- println "The state of ${query} is ${state}"
- Thread.sleep(1000);
- } while (state.equals('PENDING') || state.equals('RUNNING'))
-
- def show_job_result = sql "SHOW MTMV JOB ON ${mvName}"
- assertEquals 1, show_job_result.size()
-
- // test REFRESH make sure only define one mv and already run a task.
- sql """
- REFRESH MATERIALIZED VIEW ${mvName} COMPLETE
- """
- state = "PENDING"
- do {
- show_task_result = sql "${query}"
- if (!show_task_result.isEmpty()) {
- state = show_task_result.last().get(index)
- }
- println "The state of ${query} is ${state}"
- Thread.sleep(1000);
- } while (state.equals('PENDING') || state.equals('RUNNING'))
-
- assertEquals 'SUCCESS', state, show_task_result.last().toString()
- assertEquals 2, show_task_result.size()
-
- // test alter mtmv
- sql """
- alter MATERIALIZED VIEW ${mvName} REFRESH COMPLETE start with
"2022-11-03 00:00:00" next 2 DAY
- """
- show_job_meta = sql_meta "SHOW MTMV JOB ON ${mvName}"
- def scheduleIndex = show_job_meta.indexOf(['Schedule', 'CHAR'])
-
- show_job_result = sql "SHOW MTMV JOB ON ${mvName}"
- assertEquals 1, show_job_result.size()
-
- assertEquals 'START 2022-11-03T00:00 EVERY(2 DAYS)',
show_job_result.last().get(scheduleIndex).toString(),
show_job_result.last().toString()
-
- sql """
- DROP MATERIALIZED VIEW ${mvName}
- """
}
-
diff --git a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy
b/regression-test/suites/mtmv_p0/test_refresh_mtmv.groovy
similarity index 66%
copy from regression-test/suites/mtmv_p0/test_create_mtmv.groovy
copy to regression-test/suites/mtmv_p0/test_refresh_mtmv.groovy
index 476ab32a52..8a7cfbac97 100644
--- a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy
+++ b/regression-test/suites/mtmv_p0/test_refresh_mtmv.groovy
@@ -15,10 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-suite("test_create_mtmv") {
- def tableName = "t_user"
- def tableNamePv = "t_user_pv"
- def mvName = "multi_mv"
+suite("test_refresh_mtmv") {
+ def tableName = "t_test_refresh_mtmv_user"
+ def tableNamePv = "t_test_refresh_mtmv_user_pv"
+ def mvName = "multi_mv_test_refresh_mtmv"
sql """
ADMIN SET FRONTEND CONFIG("enable_mtmv_scheduler_framework"="true");
"""
@@ -68,6 +68,7 @@ suite("test_create_mtmv") {
SELECT ${tableName}.username, ${tableNamePv}.pv FROM ${tableName},
${tableNamePv} WHERE ${tableName}.id=${tableNamePv}.id;
"""
+ // waiting the task to be finished.
def show_task_meta = sql_meta "SHOW MTMV TASK ON ${mvName}"
def index = show_task_meta.indexOf(['State', 'CHAR'])
def query = "SHOW MTMV TASK ON ${mvName}"
@@ -85,35 +86,6 @@ suite("test_create_mtmv") {
assertEquals 'SUCCESS', state, show_task_result.last().toString()
order_qt_select "SELECT * FROM ${mvName}"
- sql """
- DROP MATERIALIZED VIEW ${mvName}
- """
-
- // test only one job created when build IMMEDIATE and start time is before
now.
- sql """
- CREATE MATERIALIZED VIEW ${mvName}
- BUILD IMMEDIATE REFRESH COMPLETE
- start with "2022-11-03 00:00:00" next 1 DAY
- KEY(username)
- DISTRIBUTED BY HASH (username) buckets 1
- PROPERTIES ('replication_num' = '1')
- AS
- SELECT ${tableName}.username, ${tableNamePv}.pv FROM ${tableName},
${tableNamePv} WHERE ${tableName}.id=${tableNamePv}.id;
- """
- // wait task to be finished to avoid task leak in suite.
- state = "PENDING"
- do {
- show_task_result = sql "${query}"
- if (!show_task_result.isEmpty()) {
- state = show_task_result.last().get(index)
- }
- println "The state of ${query} is ${state}"
- Thread.sleep(1000);
- } while (state.equals('PENDING') || state.equals('RUNNING'))
-
- def show_job_result = sql "SHOW MTMV JOB ON ${mvName}"
- assertEquals 1, show_job_result.size()
-
// test REFRESH make sure only define one mv and already run a task.
sql """
REFRESH MATERIALIZED VIEW ${mvName} COMPLETE
@@ -129,22 +101,9 @@ suite("test_create_mtmv") {
} while (state.equals('PENDING') || state.equals('RUNNING'))
assertEquals 'SUCCESS', state, show_task_result.last().toString()
- assertEquals 2, show_task_result.size()
-
- // test alter mtmv
- sql """
- alter MATERIALIZED VIEW ${mvName} REFRESH COMPLETE start with
"2022-11-03 00:00:00" next 2 DAY
- """
- show_job_meta = sql_meta "SHOW MTMV JOB ON ${mvName}"
- def scheduleIndex = show_job_meta.indexOf(['Schedule', 'CHAR'])
-
- show_job_result = sql "SHOW MTMV JOB ON ${mvName}"
- assertEquals 1, show_job_result.size()
-
- assertEquals 'START 2022-11-03T00:00 EVERY(2 DAYS)',
show_job_result.last().get(scheduleIndex).toString(),
show_job_result.last().toString()
+ assertEquals 2, show_task_result.size(), show_task_result.toString()
sql """
DROP MATERIALIZED VIEW ${mvName}
"""
}
-
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]