This is an automated email from the ASF dual-hosted git repository.
jlfsdtc pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push:
new 2647930f52 KYLIN-6090 Add project permission verification to the job
information retrieval API
2647930f52 is described below
commit 2647930f5224312b43b572ceaf4193d049fb433a
Author: jlfsdtc <[email protected]>
AuthorDate: Thu Jun 11 15:52:23 2026 +0800
KYLIN-6090 Add project permission verification to the job information
retrieval API
---
.../org/apache/kylin/job/common/SegmentUtil.java | 6 +-
.../java/org/apache/kylin/job/dao/JobInfoDao.java | 35 ++-
.../kylin/job/execution/ExecutableManager.java | 37 +--
.../org/apache/kylin/job/rest/JobMapperFilter.java | 28 +-
.../apache/kylin/job/runners/JobCheckRunner.java | 4 +-
.../kylin/job/scheduler/JdbcJobScheduler.java | 8 +-
.../resources/mybatis-mapper/JobInfoMapper.xml | 10 +
.../org/apache/kylin/job/dao/JobInfoDaoTest.java | 108 +++++++
.../kylin/job/scheduler/JdbcJobSchedulerTest.java | 2 +-
.../kylin/rest/controller/JobController.java | 6 +-
.../kylin/rest/controller/JobControllerTest.java | 2 +
.../apache/kylin/rest/service/JobInfoService.java | 16 +-
.../kylin/rest/service/JobResourceService.java | 6 +-
.../org/apache/kylin/rest/service/JobService.java | 3 +-
.../org/apache/kylin/rest/util/JobFilterUtil.java | 22 +-
.../kylin/rest/service/JobInfoServiceTest.java | 325 ++++++++++++++-------
.../apache/kylin/rest/util/JobFilterUtilTest.java | 115 ++++++++
17 files changed, 559 insertions(+), 174 deletions(-)
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/common/SegmentUtil.java
b/src/core-job/src/main/java/org/apache/kylin/job/common/SegmentUtil.java
index bdaa6c364d..3bfb90a535 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/common/SegmentUtil.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/common/SegmentUtil.java
@@ -180,10 +180,8 @@ public class SegmentUtil {
public static Segments<NDataSegment> getValidSegments(String modelId,
String project) {
val df =
NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(),
project).getDataflow(modelId);
- JobMapperFilter jobMapperFilter = new JobMapperFilter();
- jobMapperFilter.setProject(project);
- jobMapperFilter.setModelIds(Lists.newArrayList(modelId));
- jobMapperFilter.setStatuses(ExecutableState.getNotFinalStates());
+ JobMapperFilter jobMapperFilter =
JobMapperFilter.builder().project(project)
+
.modelIds(Lists.newArrayList(modelId)).statuses(ExecutableState.getNotFinalStates()).build();
List<JobInfo> runningJobInfoList =
JobContextUtil.getJobInfoDao(KylinConfig.getInstanceFromEnv())
.getJobInfoListByFilter(jobMapperFilter);
ExecutableManager executableManager =
ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobInfoDao.java
b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobInfoDao.java
index 638ce3c297..c2762c5227 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobInfoDao.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobInfoDao.java
@@ -25,17 +25,20 @@ import static
org.apache.kylin.job.util.JobInfoUtil.JOB_SERIALIZER;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.function.Predicate;
import java.util.stream.Collectors;
-import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil;
import org.apache.kylin.common.util.CompressionUtils;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.guava30.shaded.common.base.Preconditions;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.job.domain.JobInfo;
import org.apache.kylin.job.domain.JobLock;
import org.apache.kylin.job.exception.ExecuteRuntimeException;
@@ -73,9 +76,16 @@ public class JobInfoDao {
@Setter
private JobLockMapper jobLockMapper;
+ public List<JobInfo> getJobInfoListByProjectFilter(final JobMapperFilter
jobMapperFilter) {
+ if (StringUtils.isBlank(jobMapperFilter.getProject())
+ && CollectionUtils.isEmpty(jobMapperFilter.getProjects())) {
+ return Collections.emptyList();
+ }
+ return getJobInfoListByFilter(jobMapperFilter);
+ }
+
public List<JobInfo> getJobInfoListByFilter(final JobMapperFilter
jobMapperFilter) {
- List<JobInfo> jobInfoList =
jobInfoMapper.selectByJobFilter(jobMapperFilter);
- return jobInfoList;
+ return jobInfoMapper.selectByJobFilter(jobMapperFilter);
}
public long countByFilter(JobMapperFilter jobMapperFilter) {
@@ -83,8 +93,7 @@ public class JobInfoDao {
}
public List<ExecutablePO> getJobs(String project) {
- JobMapperFilter filter = new JobMapperFilter();
- filter.setProject(project);
+ JobMapperFilter filter =
JobMapperFilter.builder().project(project).build();
return
jobInfoMapper.selectByJobFilter(filter).stream().map(JobInfoUtil::deserializeExecutablePO)
.collect(Collectors.toList());
}
@@ -136,12 +145,20 @@ public class JobInfoDao {
return null;
}
+ public ExecutablePO getExecutablePoByUuidWithProject(String project,
String jobId) {
+ JobMapperFilter jobMapperFilter =
JobMapperFilter.builder().project(project).jobIds(Lists.newArrayList(jobId))
+ .build();
+ List<JobInfo> jobInfoList =
jobInfoMapper.selectByJobFilter(jobMapperFilter);
+ if (CollectionUtils.isEmpty(jobInfoList) || jobInfoList.size() != 1) {
+ return null;
+ }
+ return JobInfoUtil.deserializeExecutablePO(jobInfoList.get(0));
+ }
+
public List<ExecutablePO> getExecutablePoByStatus(String project,
List<String> jobIds,
List<ExecutableState> filterStatuses) {
- JobMapperFilter jobMapperFilter = new JobMapperFilter();
- jobMapperFilter.setProject(project);
- jobMapperFilter.setStatuses(filterStatuses);
- jobMapperFilter.setJobIds(jobIds);
+ JobMapperFilter jobMapperFilter =
JobMapperFilter.builder().project(project).statuses(filterStatuses)
+ .jobIds(jobIds).build();
List<JobInfo> jobInfoList =
jobInfoMapper.selectByJobFilter(jobMapperFilter);
if (CollectionUtils.isEmpty(jobInfoList)) {
return new ArrayList<>();
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 874f36722a..c5fcf4d6ec 100644
---
a/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++
b/src/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -1103,8 +1103,7 @@ public class ExecutableManager {
if (task instanceof ChainedStageExecutable) {
final ChainedStageExecutable stageExecutable =
(ChainedStageExecutable) task;
Map<String, List<StageExecutable>> stageMap = Optional
- .ofNullable(stageExecutable.getStagesMap())
- .orElse(Maps.newHashMap());
+
.ofNullable(stageExecutable.getStagesMap()).orElse(Maps.newHashMap());
val taskStartTime = task.getStartTime();
for (Map.Entry<String, List<StageExecutable>> entry :
stageMap.entrySet()) {
final String segmentId = entry.getKey();
@@ -1287,8 +1286,7 @@ public class ExecutableManager {
}
private List<ExecutablePO> getExecutablePOByModelId(String project, String
modelId) {
- JobMapperFilter jobMapperFilter = new JobMapperFilter();
- jobMapperFilter.setProject(project);
+ JobMapperFilter jobMapperFilter =
JobMapperFilter.builder().project(project).build();
if (null != modelId) {
jobMapperFilter.setModelIds(Lists.newArrayList(modelId));
}
@@ -1385,8 +1383,7 @@ public class ExecutableManager {
}
public List<ExecutablePO> getExecutablePOsByStatus(List<String> jobIds,
List<ExecutableState> executableStates) {
- JobMapperFilter jobMapperFilter = new JobMapperFilter();
- jobMapperFilter.setProject(project);
+ JobMapperFilter jobMapperFilter =
JobMapperFilter.builder().project(project).build();
if (CollectionUtils.isNotEmpty(jobIds)) {
jobMapperFilter.setJobIds(jobIds);
}
@@ -1411,16 +1408,6 @@ public class ExecutableManager {
return getExecutablesByStatus(null, Lists.newArrayList(status));
}
- public List<AbstractExecutable> getExecutablesByJobType(Set<JobTypeEnum>
RELATED_JOBS) {
- List<String> jobTypeNames = RELATED_JOBS.stream().map(jobTypeEnum ->
jobTypeEnum.name())
- .collect(Collectors.toList());
- JobMapperFilter jobMapperFilter = new JobMapperFilter();
- jobMapperFilter.setJobNames(jobTypeNames);
- List<JobInfo> jobInfoList =
jobInfoDao.getJobInfoListByFilter(jobMapperFilter);
- return jobInfoList.stream().map(jobInfo ->
JobInfoUtil.deserializeExecutablePO(jobInfo)).map(this::fromPO)
- .collect(Collectors.toList());
- }
-
public ExecutablePO getExecutablePO(String jobId) {
return jobInfoDao.getExecutablePOByUuid(jobId);
}
@@ -1672,15 +1659,14 @@ public class ExecutableManager {
if (StringUtils.isNotBlank(model)) {
return listExecByModelAndStatus(model, ExecutableState::isRunning,
null);
} else {
- JobMapperFilter jobMapperFilter = new JobMapperFilter();
List<ExecutableState> runningStates = Lists.newArrayList();
for (ExecutableState executableState : ExecutableState.values()) {
if (executableState.isRunning()) {
runningStates.add(executableState);
}
}
- jobMapperFilter.setStatuses(runningStates);
- jobMapperFilter.setProject(project);
+ JobMapperFilter jobMapperFilter =
JobMapperFilter.builder().project(project).statuses(runningStates)
+ .build();
return jobInfoDao.getJobInfoListByFilter(jobMapperFilter).stream()
.map(jobInfo ->
JobInfoUtil.deserializeExecutablePO(jobInfo)).map(this::fromPO)
.collect(Collectors.toList());
@@ -1752,9 +1738,8 @@ public class ExecutableManager {
}
public void checkSuicideJobOfModel(String project, String modelId) {
- JobMapperFilter jobMapperFilter = new JobMapperFilter();
- jobMapperFilter.setProject(project);
- jobMapperFilter.setModelIds(Lists.newArrayList(modelId));
+ JobMapperFilter jobMapperFilter =
JobMapperFilter.builder().project(project)
+ .modelIds(Lists.newArrayList(modelId)).build();
jobMapperFilter.setStatuses(ExecutableState.ERROR,
ExecutableState.PAUSED);
List<JobInfo> errorJobInfoList =
ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project)
.fetchJobsByFilter(jobMapperFilter);
@@ -2013,12 +1998,10 @@ public class ExecutableManager {
}
public List<AbstractExecutable>
getNotFinalExecutablesByType(List<JobTypeEnum> jobTypeEnums) {
- JobMapperFilter jobMapperFilter = new JobMapperFilter();
- jobMapperFilter.setProject(project);
- jobMapperFilter.setStatuses(ExecutableState.getNotFinalStates());
+ JobMapperFilter jobMapperFilter =
JobMapperFilter.builder().project(project)
+ .statuses(ExecutableState.getNotFinalStates()).build();
if (CollectionUtils.isNotEmpty(jobTypeEnums)) {
- jobMapperFilter.setJobNames(
- jobTypeEnums.stream().map(jobTypeEnum ->
jobTypeEnum.name()).collect(Collectors.toList()));
+
jobMapperFilter.setJobNames(jobTypeEnums.stream().map(Enum::name).collect(Collectors.toList()));
}
List<JobInfo> jobInfoList =
jobInfoDao.getJobInfoListByFilter(jobMapperFilter);
return jobInfoList.stream().map(jobInfo ->
fromPO(JobInfoUtil.deserializeExecutablePO(jobInfo)))
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/rest/JobMapperFilter.java
b/src/core-job/src/main/java/org/apache/kylin/job/rest/JobMapperFilter.java
index b907a5a070..e0db3d8ae3 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/rest/JobMapperFilter.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/rest/JobMapperFilter.java
@@ -58,6 +58,8 @@ public class JobMapperFilter {
private String project;
+ private List<String> projects;
+
private String orderByFiled;
private String orderType;
@@ -76,8 +78,32 @@ public class JobMapperFilter {
public void setStatuses(List<ExecutableState> stateList) {
statuses = stateList;
}
-
+
public void setStatuses(ExecutableState... states) {
statuses = Lists.newArrayList(states);
}
+
+ public void setProjects(List<String> projects) {
+ this.project = null;
+ this.projects = projects;
+ }
+
+ public void setProject(String project) {
+ this.project = project;
+ this.projects = null;
+ }
+
+ public static class JobMapperFilterBuilder {
+ public JobMapperFilterBuilder projects(List<String> projects) {
+ this.project = null;
+ this.projects = projects;
+ return this;
+ }
+
+ public JobMapperFilterBuilder project(String project) {
+ this.project = project;
+ this.projects = null;
+ return this;
+ }
+ }
}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckRunner.java
b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckRunner.java
index a97f34f49c..ec9e494e35 100644
---
a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckRunner.java
+++
b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckRunner.java
@@ -124,8 +124,8 @@ public class JobCheckRunner implements Runnable {
}
private void markSuicideForErrorOrPausedJobs() {
- JobMapperFilter jobMapperFilter = new JobMapperFilter();
- jobMapperFilter.setStatuses(Lists.newArrayList(ExecutableState.ERROR,
ExecutableState.PAUSED));
+ JobMapperFilter jobMapperFilter = JobMapperFilter.builder()
+ .statuses(Lists.newArrayList(ExecutableState.ERROR,
ExecutableState.PAUSED)).build();
jobMapperFilter.setLimit(10);
jobMapperFilter.setOffset(0);
List<JobInfo> jobInfoList =
jobContext.getJobInfoMapper().selectByJobFilter(jobMapperFilter);
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/scheduler/JdbcJobScheduler.java
b/src/core-job/src/main/java/org/apache/kylin/job/scheduler/JdbcJobScheduler.java
index 57d7903b4f..8414513bfc 100644
---
a/src/core-job/src/main/java/org/apache/kylin/job/scheduler/JdbcJobScheduler.java
+++
b/src/core-job/src/main/java/org/apache/kylin/job/scheduler/JdbcJobScheduler.java
@@ -360,10 +360,9 @@ public class JdbcJobScheduler implements JobScheduler {
}
private List<JobInfo> getProcessingJobInfoWithOrder() {
- JobMapperFilter jobMapperFilter = new JobMapperFilter();
+ JobMapperFilter jobMapperFilter =
JobMapperFilter.builder().orderByFiled("priority,create_time")
+ .orderType("ASC").build();
jobMapperFilter.setStatuses(ExecutableState.READY,
ExecutableState.PENDING, ExecutableState.RUNNING);
- jobMapperFilter.setOrderByFiled("priority,create_time");
- jobMapperFilter.setOrderType("ASC");
return
jobContext.getJobInfoMapper().selectByJobFilter(jobMapperFilter);
}
@@ -407,12 +406,11 @@ public class JdbcJobScheduler implements JobScheduler {
private void releaseExpiredLock() {
int batchSize =
jobContext.getKylinConfig().getJobSchedulerMasterPollBatchSize();
- JobMapperFilter filter = new JobMapperFilter();
List<String> jobIds =
jobContext.getJobLockMapper().findExpiredORNonLockIdList(batchSize);
if (jobIds.isEmpty()) {
return;
}
- filter.setJobIds(jobIds);
+ JobMapperFilter filter =
JobMapperFilter.builder().jobIds(jobIds).build();
List<JobInfo> jobs =
jobContext.getJobInfoMapper().selectByJobFilter(filter);
List<String> jobInfoIds =
jobs.stream().map(JobInfo::getJobId).collect(Collectors.toList());
List<String> toRemoveLocks =
Lists.newArrayList(jobIds).stream().filter(jobId -> !jobInfoIds.contains(jobId))
diff --git a/src/core-job/src/main/resources/mybatis-mapper/JobInfoMapper.xml
b/src/core-job/src/main/resources/mybatis-mapper/JobInfoMapper.xml
index a51f449e0f..ad6e056c11 100644
--- a/src/core-job/src/main/resources/mybatis-mapper/JobInfoMapper.xml
+++ b/src/core-job/src/main/resources/mybatis-mapper/JobInfoMapper.xml
@@ -214,6 +214,11 @@
<if test="project != null">
project = #{project}
</if>
+ <if test="projects != null">
+ <foreach close=")" collection="projects" index="index" item="item"
open="project in (" separator=",">
+ #{item}
+ </foreach>
+ </if>
<if test="queryStartTime != null">
AND update_time >= #{queryStartTime}
</if>
@@ -275,6 +280,11 @@
<if test="project != null">
project = #{project}
</if>
+ <if test="projects != null">
+ <foreach close=")" collection="projects" index="index" item="item"
open="project in (" separator=",">
+ #{item}
+ </foreach>
+ </if>
<if test="queryStartTime != null">
AND update_time >= #{queryStartTime}
</if>
diff --git
a/src/core-job/src/test/java/org/apache/kylin/job/dao/JobInfoDaoTest.java
b/src/core-job/src/test/java/org/apache/kylin/job/dao/JobInfoDaoTest.java
new file mode 100644
index 0000000000..3accdcc1b1
--- /dev/null
+++ b/src/core-job/src/test/java/org/apache/kylin/job/dao/JobInfoDaoTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.dao;
+
+import java.util.List;
+
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import org.apache.kylin.job.domain.JobInfo;
+import org.apache.kylin.job.mapper.JobInfoMapper;
+import org.apache.kylin.job.rest.JobMapperFilter;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+import org.springframework.test.util.ReflectionTestUtils;
+
+public class JobInfoDaoTest {
+
+ private JobInfoMapper jobInfoMapper;
+ private JobInfoDao jobInfoDao;
+
+ @Before
+ public void setUp() {
+ jobInfoMapper = Mockito.mock(JobInfoMapper.class);
+ jobInfoDao = new JobInfoDao();
+ ReflectionTestUtils.setField(jobInfoDao, "jobInfoMapper",
jobInfoMapper);
+ }
+
+ private JobInfo mockJobInfo(String jobId) {
+ JobInfo jobInfo = new JobInfo();
+ jobInfo.setJobId(jobId);
+ return jobInfo;
+ }
+
+ @Test
+ public void
testGetJobInfoListByProjectFilterReturnsEmptyWhenNoProjectScope() {
+ JobMapperFilter filter = JobMapperFilter.builder().build();
+
+ List<JobInfo> result =
jobInfoDao.getJobInfoListByProjectFilter(filter);
+
+ Assert.assertTrue(result.isEmpty());
+ // must short-circuit without hitting the mapper to avoid a
cross-project scan
+ Mockito.verify(jobInfoMapper,
Mockito.never()).selectByJobFilter(ArgumentMatchers.any());
+ }
+
+ @Test
+ public void
testGetJobInfoListByProjectFilterReturnsEmptyWhenProjectIsBlank() {
+ JobMapperFilter filter = JobMapperFilter.builder().project("
").build();
+
+ List<JobInfo> result =
jobInfoDao.getJobInfoListByProjectFilter(filter);
+
+ Assert.assertTrue(result.isEmpty());
+ Mockito.verify(jobInfoMapper,
Mockito.never()).selectByJobFilter(ArgumentMatchers.any());
+ }
+
+ @Test
+ public void
testGetJobInfoListByProjectFilterDelegatesWhenSingleProjectPresent() {
+ JobMapperFilter filter =
JobMapperFilter.builder().project("default").build();
+ List<JobInfo> expected = Lists.newArrayList(mockJobInfo("job-1"));
+
Mockito.when(jobInfoMapper.selectByJobFilter(filter)).thenReturn(expected);
+
+ List<JobInfo> result =
jobInfoDao.getJobInfoListByProjectFilter(filter);
+
+ Assert.assertSame(expected, result);
+ Mockito.verify(jobInfoMapper,
Mockito.times(1)).selectByJobFilter(filter);
+ }
+
+ @Test
+ public void
testGetJobInfoListByProjectFilterDelegatesWhenProjectsPresent() {
+ JobMapperFilter filter =
JobMapperFilter.builder().projects(Lists.newArrayList("p1", "p2")).build();
+ List<JobInfo> expected = Lists.newArrayList(mockJobInfo("job-1"),
mockJobInfo("job-2"));
+
Mockito.when(jobInfoMapper.selectByJobFilter(filter)).thenReturn(expected);
+
+ List<JobInfo> result =
jobInfoDao.getJobInfoListByProjectFilter(filter);
+
+ Assert.assertEquals(2, result.size());
+ Mockito.verify(jobInfoMapper,
Mockito.times(1)).selectByJobFilter(filter);
+ }
+
+ @Test
+ public void testGetJobInfoListByFilterAlwaysDelegates() {
+ JobMapperFilter filter = JobMapperFilter.builder().build();
+ List<JobInfo> expected = Lists.newArrayList(mockJobInfo("job-1"));
+
Mockito.when(jobInfoMapper.selectByJobFilter(filter)).thenReturn(expected);
+
+ List<JobInfo> result = jobInfoDao.getJobInfoListByFilter(filter);
+
+ Assert.assertSame(expected, result);
+ Mockito.verify(jobInfoMapper,
Mockito.times(1)).selectByJobFilter(filter);
+ }
+}
diff --git
a/src/core-job/src/test/java/org/apache/kylin/job/scheduler/JdbcJobSchedulerTest.java
b/src/core-job/src/test/java/org/apache/kylin/job/scheduler/JdbcJobSchedulerTest.java
index c83bac84dd..c10f2de403 100644
---
a/src/core-job/src/test/java/org/apache/kylin/job/scheduler/JdbcJobSchedulerTest.java
+++
b/src/core-job/src/test/java/org/apache/kylin/job/scheduler/JdbcJobSchedulerTest.java
@@ -106,7 +106,7 @@ class JdbcJobSchedulerTest extends AbstractTestCase {
for (int i = 0; i < 3; i++) {
mockJob();
}
- JobMapperFilter filter = new JobMapperFilter();
+ JobMapperFilter filter = JobMapperFilter.builder().build();
filter.setStatuses(ExecutableState.RUNNING);
await().atMost(5, TimeUnit.SECONDS).until(() ->
jobInfoDao.getJobInfoListByFilter(filter).size() == 3);
Assertions.assertEquals(secondJobContext.getJobScheduler().getRunningJob().size()
diff --git
a/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/JobController.java
b/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/JobController.java
index 7080bc4880..08c8071f02 100644
---
a/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++
b/src/data-loading-server/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -210,9 +210,9 @@ public class JobController extends BaseController {
@ResponseBody
public EnvelopeResponse<List<ExecutableStepResponse>>
getJobDetail(@PathVariable(value = "job_id") String jobId,
@RequestParam(value = "project") String project) {
- checkProjectName(project);
+ String projectName = checkProjectName(project);
checkRequiredArg(JOB_ID_ARG_NAME, jobId);
- return new EnvelopeResponse<>(KylinException.CODE_SUCCESS,
jobInfoService.getJobDetail(project, jobId), "");
+ return new EnvelopeResponse<>(KylinException.CODE_SUCCESS,
jobInfoService.getJobDetail(projectName, jobId), "");
}
@ApiOperation(value = "updateJobStatus", tags = {
@@ -446,7 +446,7 @@ public class JobController extends BaseController {
}
private Set<String> getLogicalViewMetaDumpList(KylinConfig config, String
project, String viewTable,
- String modelId) {
+ String modelId) {
Set<String> dumpList = new LinkedHashSet<>();
if (!config.isDDLLogicalViewEnabled()) {
return dumpList;
diff --git
a/src/data-loading-server/src/test/java/org/apache/kylin/rest/controller/JobControllerTest.java
b/src/data-loading-server/src/test/java/org/apache/kylin/rest/controller/JobControllerTest.java
index b276cc81ce..231c5e8a15 100644
---
a/src/data-loading-server/src/test/java/org/apache/kylin/rest/controller/JobControllerTest.java
+++
b/src/data-loading-server/src/test/java/org/apache/kylin/rest/controller/JobControllerTest.java
@@ -257,6 +257,8 @@ public class JobControllerTest extends
NLocalFileMetadataTestCase {
.andExpect(MockMvcResultMatchers.status().isOk()).andReturn();
Mockito.verify(jobController).getJobDetail("e1ad7bb0-522e-456a-859d-2eab1df448de",
"default");
+ // the resolved (normalized) project name from checkProjectName is
forwarded to the service
+ Mockito.verify(jobInfoService).getJobDetail("default",
"e1ad7bb0-522e-456a-859d-2eab1df448de");
}
private List<ExecutableStepResponse> mockStepsResponse() {
diff --git
a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobInfoService.java
b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobInfoService.java
index 4b45be394e..071f1f0013 100644
---
a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobInfoService.java
+++
b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobInfoService.java
@@ -250,13 +250,12 @@ public class JobInfoService extends BasicService
implements JobSupporter {
// TODO model == null || !model.isFusionModel();
public List<ExecutableResponse> listJobs(final JobFilter jobFilter, int
offset, int limit) {
- // TODO check permission when 'project' is empty
if (StringUtils.isNotEmpty(jobFilter.getProject())) {
aclEvaluate.checkProjectOperationPermission(jobFilter.getProject());
}
JobMapperFilter jobMapperFilter =
JobFilterUtil.getJobMapperFilter(jobFilter, offset, limit, modelService,
tableExtService, projectService);
- List<JobInfo> jobInfoList =
jobInfoDao.getJobInfoListByFilter(jobMapperFilter);
+ List<JobInfo> jobInfoList =
jobInfoDao.getJobInfoListByProjectFilter(jobMapperFilter);
List<ExecutableResponse> result =
jobInfoList.stream().map(JobInfoUtil::deserializeExecutablePO)
.map(executablePO -> {
AbstractExecutable executable =
getManager(ExecutableManager.class, executablePO.getProject())
@@ -264,6 +263,7 @@ public class JobInfoService extends BasicService implements
JobSupporter {
val convert = this.convert(executable, executablePO);
return convert;
}).collect(Collectors.toList());
+
sortByDurationIfNeed(result, jobFilter.getSortBy(),
jobMapperFilter.getOrderType());
return result;
}
@@ -287,7 +287,6 @@ public class JobInfoService extends BasicService implements
JobSupporter {
}
public long countJobs(final JobFilter jobFilter) {
- // TODO check permission when 'project' is empty
if (StringUtils.isNotEmpty(jobFilter.getProject())) {
aclEvaluate.checkProjectOperationPermission(jobFilter.getProject());
}
@@ -298,7 +297,7 @@ public class JobInfoService extends BasicService implements
JobSupporter {
public List<ExecutableStepResponse> getJobDetail(String project, String
jobId) {
aclEvaluate.checkProjectOperationPermission(project);
- ExecutablePO executablePO = jobInfoDao.getExecutablePOByUuid(jobId);
+ ExecutablePO executablePO =
jobInfoDao.getExecutablePoByUuidWithProject(project, jobId);
if (executablePO == null) {
throw new KylinException(JOB_NOT_EXIST, jobId);
}
@@ -373,8 +372,7 @@ public class JobInfoService extends BasicService implements
JobSupporter {
// table sampling and snapshot table don't have some
segment
if (!StringUtils.equals(task.getId(), segmentId)) {
setSegmentSubStageParams(project, targetSubject, task,
segmentId, segmentSubStages,
- stageExecutables,
- stageResponses, waiteTimeMap,
output.getState(), executablePO);
+ stageExecutables, stageResponses,
waiteTimeMap, output.getState(), executablePO);
stringSubStageMap.put(segmentId, segmentSubStages);
}
}
@@ -474,6 +472,7 @@ public class JobInfoService extends BasicService implements
JobSupporter {
updateJobStatus(job.getId(), executablePO, project, action);
return getJobInstance(job.getId());
}
+
private void jobActionValidate(String action) {
JobActionEnum.validateValue(action.toUpperCase(Locale.ROOT));
}
@@ -995,9 +994,7 @@ public class JobInfoService extends BasicService implements
JobSupporter {
return Lists.newArrayList();
}
- JobMapperFilter jobMapperFilter = new JobMapperFilter();
- jobMapperFilter.setProject(project);
- jobMapperFilter.setModelIds(batchModelIds);
+ JobMapperFilter jobMapperFilter =
JobMapperFilter.builder().project(project).modelIds(batchModelIds).build();
List<ExecutableState> ignoreStates =
Lists.newArrayList(ExecutableState.SUCCEED, ExecutableState.ERROR,
ExecutableState.DISCARDED, ExecutableState.SUICIDAL);
List<ExecutableState> states = Arrays.stream(ExecutableState.values())
@@ -1007,6 +1004,7 @@ public class JobInfoService extends BasicService
implements JobSupporter {
return jobInfoList.stream().map(jobInfo ->
jobInfo.getJobId()).collect(Collectors.toList());
}
+ @Override
@Transaction(project = 0)
public void stopBatchJob(String project, TableDesc tableDesc) {
List<String> fusionModelIds = getFusionModelsByTableDesc(project,
tableDesc);
diff --git
a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobResourceService.java
b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobResourceService.java
index 2c831af7ee..9e4000ece7 100644
---
a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobResourceService.java
+++
b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobResourceService.java
@@ -96,10 +96,8 @@ public class JobResourceService {
val jobInfoDao = JobContextUtil.getJobInfoDao(config);
Set<String> queues = Sets.newHashSet();
projects.forEach(projectInstance -> {
- JobMapperFilter jobMapperFilter = new JobMapperFilter();
- jobMapperFilter.setProject(projectInstance.getName());
- jobMapperFilter.setStatuses(ExecutableState.SUCCEED);
- jobMapperFilter.setLimit(10);
+ JobMapperFilter jobMapperFilter =
JobMapperFilter.builder().project(projectInstance.getName())
+
.statuses(Lists.newArrayList(ExecutableState.SUCCEED)).limit(10).build();
val jobs = jobInfoDao.getJobInfoListByFilter(jobMapperFilter);
if (CollectionUtils.isNotEmpty(jobs)) {
for (JobInfo jobInfo : jobs) {
diff --git
a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
index 0e577dadf9..51da68a86c 100644
---
a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
+++
b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -244,8 +244,7 @@ public class JobService extends BasicService {
}
public String getProjectByJobId(String jobId) {
- JobMapperFilter jobMapperFilter = new JobMapperFilter();
- jobMapperFilter.setJobId(jobId);
+ JobMapperFilter jobMapperFilter =
JobMapperFilter.builder().jobId(jobId).build();
List<JobInfo> jobInfoList =
jobInfoDao.getJobInfoListByFilter(jobMapperFilter);
if (CollectionUtils.isEmpty(jobInfoList)) {
return null;
diff --git
a/src/data-loading-service/src/main/java/org/apache/kylin/rest/util/JobFilterUtil.java
b/src/data-loading-service/src/main/java/org/apache/kylin/rest/util/JobFilterUtil.java
index a744384089..8e6d56432e 100644
---
a/src/data-loading-service/src/main/java/org/apache/kylin/rest/util/JobFilterUtil.java
+++
b/src/data-loading-service/src/main/java/org/apache/kylin/rest/util/JobFilterUtil.java
@@ -20,6 +20,7 @@ package org.apache.kylin.rest.util;
import static
org.apache.kylin.common.exception.ServerErrorCode.INVALID_PARAMETER;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
@@ -48,8 +49,11 @@ import org.apache.kylin.rest.service.ProjectService;
import org.apache.kylin.rest.service.TableExtService;
import org.sparkproject.guava.collect.Lists;
+import lombok.SneakyThrows;
+
public class JobFilterUtil {
+ @SneakyThrows(IOException.class)
public static JobMapperFilter getJobMapperFilter(final JobFilter
jobFilter, int offset, int limit,
ModelService modelService, TableExtService tableExtService,
ProjectService projectService) {
Date queryStartTime = getQueryStartTime(jobFilter.getTimeFilter());
@@ -90,9 +94,21 @@ public class JobFilterUtil {
.forEach(jobStatus ->
scheduleStates.addAll(JobStatusUtil.mapJobStatusToScheduleState(jobStatus)));
}
- return new JobMapperFilter(scheduleStates, jobFilter.getJobNames(),
queryStartTime.getTime(),
- Lists.newArrayList(subjects), null, jobId, null,
jobFilter.getProject(), orderByField, orderType,
- offset, limit, null, null);
+ JobMapperFilter.JobMapperFilterBuilder filterBuilder =
JobMapperFilter.builder().statuses(scheduleStates)
+
.jobNames(jobFilter.getJobNames()).queryStartTime(queryStartTime.getTime())
+
.subjects(Lists.newArrayList(subjects)).jobId(jobId).project(jobFilter.getProject())
+
.orderByFiled(orderByField).orderType(orderType).offset(offset).limit(limit);
+
+ if (StringUtils.isEmpty(jobFilter.getProject())) {
+ List<String> projects = projectService
+
.getProjectsFilterByExactMatchAndPermissionWrapperUserPermission(null, false,
+ AclPermissionEnum.OPERATION)
+ .stream().map(projectAndPermission ->
projectAndPermission.getProject().getName())
+ .collect(Collectors.toList());
+ return filterBuilder.projects(projects).build();
+ }
+
+ return filterBuilder.build();
}
private static Date getQueryStartTime(int timeFilter) {
diff --git
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobInfoServiceTest.java
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobInfoServiceTest.java
index d921bc2a10..bc5e46210d 100644
---
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobInfoServiceTest.java
+++
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/JobInfoServiceTest.java
@@ -23,9 +23,14 @@ import static
org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_STATUS_
import static
org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_UPDATE_STATUS_FAILED;
import static org.apache.kylin.job.constant.JobStatusEnum.PENDING;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.BufferedReader;
import java.io.File;
@@ -48,7 +53,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.stream.Collectors;
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.exception.ErrorCode;
@@ -64,6 +69,7 @@ import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.engine.spark.job.NSparkExecutable;
import org.apache.kylin.engine.spark.job.NTableSamplingJob;
import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.job.dao.ExecutableOutputPO;
@@ -90,16 +96,18 @@ import org.apache.kylin.metadata.model.NDataModel;
import org.apache.kylin.metadata.model.NDataModelManager;
import org.apache.kylin.metadata.model.NTableMetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.rest.request.JobUpdateRequest;
import org.apache.kylin.rest.response.ExecutableResponse;
import org.apache.kylin.rest.response.ExecutableStepResponse;
+import org.apache.kylin.rest.response.UserProjectPermissionResponse;
+import org.apache.kylin.rest.security.AclPermissionEnum;
import org.apache.kylin.rest.util.AclEvaluate;
import org.apache.kylin.rest.util.AclUtil;
import org.apache.spark.application.NoRetryException;
import org.assertj.core.api.Assertions;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -114,7 +122,7 @@ import lombok.var;
public class JobInfoServiceTest extends LogOutputTestCase {
- String project = "default";
+ private static final String DEFAULT_PROJECT = "default";
private JobInfoService jobInfoService = Mockito.spy(JobInfoService.class);
@@ -168,21 +176,22 @@ public class JobInfoServiceTest extends LogOutputTestCase
{
getTestConfig().setProperty("kylin.streaming.enabled", "false");
// test size
List<String> jobNames = Lists.newArrayList();
- JobFilter jobFilter = new JobFilter(Lists.newArrayList(), jobNames, 4,
"", "", false, "default", "", true);
+ JobFilter jobFilter = new JobFilter(Lists.newArrayList(), jobNames, 4,
"", "", false, DEFAULT_PROJECT, "",
+ true);
List<ExecutableResponse> jobs = jobInfoService.listJobs(jobFilter);
- Assert.assertEquals(3, jobs.size());
+ assertEquals(3, jobs.size());
jobInfoService.addOldParams(jobs);
jobFilter.setSubject("");
jobFilter.setStatuses(Lists.newArrayList(JobStatusEnum.NEW));
jobFilter.setTimeFilter(1);
List<ExecutableResponse> jobs4 = jobInfoService.listJobs(jobFilter);
- Assert.assertEquals(2, jobs4.size());
+ assertEquals(2, jobs4.size());
jobFilter.setSubject("");
jobFilter.setStatuses(Lists.newArrayList(JobStatusEnum.NEW,
JobStatusEnum.FINISHED));
jobFilter.setTimeFilter(1);
jobs4 = jobInfoService.listJobs(jobFilter);
- Assert.assertEquals(3, jobs4.size());
+ assertEquals(3, jobs4.size());
jobFilter.setStatuses(Lists.newArrayList());
jobFilter.setTimeFilter(3);
@@ -197,26 +206,26 @@ public class JobInfoServiceTest extends LogOutputTestCase
{
maxDuration = duration;
}
}
- Assert.assertTrue(jobs7.size() == 3 && jobs7.get(0).getDuration() ==
maxDuration);
+ assertTrue(jobs7.size() == 3 && jobs7.get(0).getDuration() ==
maxDuration);
jobFilter.setSortBy("create_time");
jobFilter.setReverse(true);
List<ExecutableResponse> jobs8 = jobInfoService.listJobs(jobFilter);
- Assert.assertTrue(jobs8.size() == 3 &&
jobs8.get(0).getId().equals("sparkjob3"));
+ assertTrue(jobs8.size() == 3 &&
jobs8.get(0).getId().equals("sparkjob3"));
jobFilter.setReverse(false);
jobFilter.setStatuses(Lists.newArrayList());
jobFilter.setSortBy("");
List<ExecutableResponse> jobs10 = jobInfoService.listJobs(jobFilter);
- Assert.assertEquals(3, jobs10.size());
+ assertEquals(3, jobs10.size());
jobFilter.setSortBy("job_status");
List<ExecutableResponse> jobs11 = jobInfoService.listJobs(jobFilter);
- Assert.assertTrue(jobs11.size() == 3 &&
jobs11.get(2).getId().equals("sparkjob1"));
+ assertTrue(jobs11.size() == 3 &&
jobs11.get(2).getId().equals("sparkjob1"));
jobFilter.setSortBy("create_time");
List<ExecutableResponse> jobs12 = jobInfoService.listJobs(jobFilter);
- Assert.assertTrue(jobs12.size() == 3 &&
jobs12.get(0).getId().equals("sparkjob1"));
+ assertTrue(jobs12.size() == 3 &&
jobs12.get(0).getId().equals("sparkjob1"));
jobFilter.setSortBy("target_subject");
for (ExecutablePO job : mockJobs) {
@@ -226,7 +235,7 @@ public class JobInfoServiceTest extends LogOutputTestCase {
});
}
List<ExecutableResponse> sortJobs2 =
jobInfoService.listJobs(jobFilter);
- Assert.assertTrue(sortJobs2.size() == 3 &&
sortJobs2.get(0).getId().equals("sparkjob1"));
+ assertTrue(sortJobs2.size() == 3 &&
sortJobs2.get(0).getId().equals("sparkjob1"));
for (ExecutablePO job : mockJobs) {
jobInfoDao.updateJob(job.getUuid(), jobUpdater -> {
jobUpdater.setJobType(JobTypeEnum.TABLE_SAMPLING);
@@ -234,7 +243,7 @@ public class JobInfoServiceTest extends LogOutputTestCase {
});
}
List<ExecutableResponse> sortJobs3 =
jobInfoService.listJobs(jobFilter);
- Assert.assertTrue(sortJobs3.size() == 3 &&
sortJobs3.get(0).getId().equals("sparkjob1"));
+ assertTrue(sortJobs3.size() == 3 &&
sortJobs3.get(0).getId().equals("sparkjob1"));
for (ExecutablePO job : mockJobs) {
jobInfoDao.updateJob(job.getUuid(), jobUpdater -> {
jobUpdater.setJobType(JobTypeEnum.SNAPSHOT_BUILD);
@@ -248,7 +257,7 @@ public class JobInfoServiceTest extends LogOutputTestCase {
}
jobFilter.setSortBy("job_status");
List<ExecutableResponse> sortJobs4 =
jobInfoService.listJobs(jobFilter);
- Assert.assertTrue(sortJobs4.size() == 3 &&
sortJobs4.get(2).getId().equals("sparkjob2"));
+ assertTrue(sortJobs4.size() == 3 &&
sortJobs4.get(2).getId().equals("sparkjob2"));
for (ExecutablePO job : mockJobs) {
jobInfoDao.updateJob(job.getUuid(), jobUpdater -> {
jobUpdater.setJobType(JobTypeEnum.SNAPSHOT_REFRESH);
@@ -261,7 +270,7 @@ public class JobInfoServiceTest extends LogOutputTestCase {
});
}
List<ExecutableResponse> sortJobs5 =
jobInfoService.listJobs(jobFilter);
- Assert.assertTrue(sortJobs5.size() == 3 &&
sortJobs5.get(0).getId().equals("sparkjob1"));
+ assertTrue(sortJobs5.size() == 3 &&
sortJobs5.get(0).getId().equals("sparkjob1"));
jobFilter.setSortBy("total_time");
assertKylinExeption(() -> {
@@ -270,20 +279,20 @@ public class JobInfoServiceTest extends LogOutputTestCase
{
jobFilter.setSortBy("create_time");
List<ExecutableResponse> jobs13 = jobInfoService.listJobs(jobFilter,
0, 10);
- Assert.assertEquals(3, jobs13.size());
+ assertEquals(3, jobs13.size());
String jobId = jobs13.get(0).getId();
for (ExecutablePO job : mockJobs) {
job.setJobType(JobTypeEnum.TABLE_SAMPLING);
}
jobFilter.setKey(jobId);
List<ExecutableResponse> jobs14 = jobInfoService.listJobs(jobFilter,
0, 10);
- Assert.assertTrue(jobs14.size() == 1 &&
jobs14.get(0).getId().equals(jobId));
+ assertTrue(jobs14.size() == 1 && jobs14.get(0).getId().equals(jobId));
jobFilter.setStatuses(Lists.newArrayList());
List<ExecutableResponse> jobs15 = jobInfoService.listJobs(jobFilter,
0, 10);
assertEquals(1, jobs15.size());
jobFilter.setStatuses(Lists.newArrayList(JobStatusEnum.NEW));
List<ExecutableResponse> jobs16 = jobInfoService.listJobs(jobFilter,
0, 10);
- assertEquals(0, jobs16.size());
+ assertTrue(jobs16.isEmpty());
}
private List<ExecutablePO> mockDetailJobs(boolean random) throws Exception
{
@@ -304,10 +313,6 @@ public class JobInfoServiceTest extends LogOutputTestCase {
return jobs;
}
- private String getProject() {
- return "default";
- }
-
private long getCreateTime(String name) {
switch (name) {
case "1":
@@ -325,7 +330,7 @@ public class JobInfoServiceTest extends LogOutputTestCase {
ExecutablePO mockJob = new ExecutablePO();
mockJob.setType("org.apache.kylin.job.execution.SucceedChainedTestExecutable");
mockJob.setJobType(JobTypeEnum.INDEX_BUILD);
- mockJob.setProject(getProject());
+ mockJob.setProject(DEFAULT_PROJECT);
mockJob.setUuid("sparkjob" + name);
mockJob.setTargetModel("model" + name);
val jobOutput = mockJob.getOutput();
@@ -342,7 +347,7 @@ public class JobInfoServiceTest extends LogOutputTestCase {
val childExecutable = new ExecutablePO();
childExecutable.setUuid(mockJob.getId() + "_0" + i);
childExecutable.setType("org.apache.kylin.job.execution.SucceedSubTaskTestExecutable");
- childExecutable.setProject(getProject());
+ childExecutable.setProject(DEFAULT_PROJECT);
val jobChildOutput = childExecutable.getOutput();
mockOutputTime(random, lastEndTime, jobChildOutput, i);
lastEndTime = jobChildOutput.getEndTime();
@@ -383,8 +388,8 @@ public class JobInfoServiceTest extends LogOutputTestCase {
.collect(Collectors.toList());
List<Long> copyDurationList = new ArrayList<>(totalDurationArrays);
copyDurationList.sort(Collections.reverseOrder());
- Assert.assertEquals(3, copyDurationList.size());
- Assert.assertEquals(totalDurationArrays, copyDurationList);
+ assertEquals(3, copyDurationList.size());
+ assertEquals(totalDurationArrays, copyDurationList);
}
for (JobInfo jobInfo : mockJobs) {
@@ -397,7 +402,7 @@ public class JobInfoServiceTest extends LogOutputTestCase {
JobFilter jobFilter = new JobFilter(Lists.newArrayList(), jobNames, 0,
"", "default", false, "default", "",
false);
List<ExecutableResponse> jobs = jobInfoService.listJobs(jobFilter);
- Assert.assertEquals(0, jobs.size());
+ assertEquals(0, jobs.size());
}
@Test
@@ -415,7 +420,7 @@ public class JobInfoServiceTest extends LogOutputTestCase {
List<String> jobNames = Lists.newArrayList();
JobFilter jobFilter = new JobFilter(Lists.newArrayList(), jobNames, 4,
"", "", false, "default", "", true);
List<ExecutableResponse> jobs = jobInfoService.listJobs(jobFilter);
- Assert.assertTrue(jobs.get(0).getCreateTime() > 0);
+ assertTrue(jobs.get(0).getCreateTime() > 0);
}
private void addSegment(AbstractExecutable job) {
@@ -428,13 +433,13 @@ public class JobInfoServiceTest extends LogOutputTestCase
{
public void testGetTargetSubjectAndJobType() {
ExecutableManager manager =
ExecutableManager.getInstance(jobInfoService.getConfig(), "default");
SucceedChainedTestExecutable job1 = new SucceedChainedTestExecutable();
- job1.setProject(getProject());
+ job1.setProject(DEFAULT_PROJECT);
job1.setName("mocked job");
job1.setTargetSubject("12345678");
job1.setJobType(JobTypeEnum.INDEX_BUILD);
- final TableDesc tableDesc =
NTableMetadataManager.getInstance(getTestConfig(), getProject())
+ final TableDesc tableDesc =
NTableMetadataManager.getInstance(getTestConfig(), DEFAULT_PROJECT)
.getTableDesc("DEFAULT.TEST_KYLIN_FACT");
- NTableSamplingJob samplingJob =
NTableSamplingJob.internalCreate(tableDesc, getProject(), "ADMIN", 20000);
+ NTableSamplingJob samplingJob =
NTableSamplingJob.internalCreate(tableDesc, DEFAULT_PROJECT, "ADMIN", 20000);
manager.addJob(job1);
manager.addJob(samplingJob);
List<String> jobNames = Lists.newArrayList();
@@ -442,20 +447,20 @@ public class JobInfoServiceTest extends LogOutputTestCase
{
jobFilter.setSortBy("create_time");
List<ExecutableResponse> jobs = jobInfoService.listJobs(jobFilter);
- Assert.assertEquals("The model is deleted",
jobs.get(0).getTargetSubject()); // no target model so it's null
- Assert.assertEquals("mocked job", jobs.get(0).getJobName());
- Assert.assertEquals(tableDesc.getIdentity(),
jobs.get(1).getTargetSubject());
- Assert.assertEquals("TABLE_SAMPLING", jobs.get(1).getJobName());
+ assertEquals("The model is deleted", jobs.get(0).getTargetSubject());
// no target model so it's null
+ assertEquals("mocked job", jobs.get(0).getJobName());
+ assertEquals(tableDesc.getIdentity(), jobs.get(1).getTargetSubject());
+ assertEquals("TABLE_SAMPLING", jobs.get(1).getJobName());
}
@Test
public void testJobnameResponse() throws Exception {
- ExecutableManager manager =
Mockito.spy(ExecutableManager.getInstance(getTestConfig(), getProject()));
+ ExecutableManager manager =
Mockito.spy(ExecutableManager.getInstance(getTestConfig(), DEFAULT_PROJECT));
ConcurrentHashMap<Class, ConcurrentHashMap<String, Object>>
managersByPrjCache = NLocalFileMetadataTestCase
.getInstanceByProject();
- managersByPrjCache.get(ExecutableManager.class).put(getProject(),
manager);
+ managersByPrjCache.get(ExecutableManager.class).put(DEFAULT_PROJECT,
manager);
ExecutablePO job1 = Mockito.spy(ExecutablePO.class);
- job1.setProject(getProject());
+ job1.setProject(DEFAULT_PROJECT);
job1.setUuid("sparkjob1");
job1.setTargetModel("model1");
job1.setJobType(JobTypeEnum.INC_BUILD);
@@ -464,7 +469,7 @@ public class JobInfoServiceTest extends LogOutputTestCase {
subJob.setType("org.apache.kylin.job.execution.SucceedChainedTestExecutable");
subJob.setJobType(JobTypeEnum.INC_BUILD);
subJob.getOutput().setStatus("SUCCEED");
- subJob.setProject(getProject());
+ subJob.setProject(DEFAULT_PROJECT);
subJob.setUuid(job1.getId() + "_00");
job1.setTasks(Lists.newArrayList(subJob));
manager.addJob(job1);
@@ -477,11 +482,11 @@ public class JobInfoServiceTest extends LogOutputTestCase
{
false);
List<ExecutableResponse> jobs = jobInfoService.listJobs(jobFilter);
- Assert.assertEquals(2, jobs.size());
+ assertEquals(2, jobs.size());
ExecutableResponse executableResponse = jobs.get(0);
- Assert.assertEquals("sparkjob1", executableResponse.getId());
+ assertEquals("sparkjob1", executableResponse.getId());
}
@@ -499,15 +504,15 @@ public class JobInfoServiceTest extends LogOutputTestCase
{
List<String> jobNames = Lists.newArrayList();
JobFilter jobFilter = new JobFilter(Lists.newArrayList(), jobNames, 0,
"", "def", false, "default", "", false);
List<ExecutableResponse> jobs = jobInfoService.listJobs(jobFilter);
- Assert.assertEquals(0, jobs.size());
+ assertEquals(0, jobs.size());
JobFilter jobFilter2 = new JobFilter(Lists.newArrayList(), jobNames,
0, "", "def", true, "default", "", false);
List<ExecutableResponse> jobs2 = jobInfoService.listJobs(jobFilter2);
- Assert.assertEquals(0, jobs2.size());
+ assertEquals(0, jobs2.size());
JobFilter jobFilter3 = new JobFilter(Lists.newArrayList(), jobNames,
0, "", null, true, "default", "", false);
List<ExecutableResponse> jobs3 = jobInfoService.listJobs(jobFilter3);
- Assert.assertEquals(3, jobs3.size());
+ assertEquals(3, jobs3.size());
}
@Test
@@ -527,7 +532,7 @@ public class JobInfoServiceTest extends LogOutputTestCase {
String segmentId2 = RandomUtil.randomUUIDStr();
String segmentIds = segmentId + "," + segmentId2;
- ExecutableManager manager =
ExecutableManager.getInstance(jobInfoService.getConfig(), project);
+ ExecutableManager manager =
ExecutableManager.getInstance(jobInfoService.getConfig(), DEFAULT_PROJECT);
SucceedChainedTestExecutable executable = new
SucceedChainedTestExecutable();
executable.setId(RandomUtil.randomUUIDStr());
executable.setTargetSubject("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
@@ -565,7 +570,7 @@ public class JobInfoServiceTest extends LogOutputTestCase {
manager.updateStageStatus(logicStep.getId(), segmentId,
ExecutableState.RUNNING, null, "test output");
- List<ExecutableStepResponse> jobDetail =
jobInfoService.getJobDetail(project, executable.getId());
+ List<ExecutableStepResponse> jobDetail =
jobInfoService.getJobDetail(DEFAULT_PROJECT, executable.getId());
assertEquals(1, jobDetail.size());
ExecutableStepResponse executableStepResponse = jobDetail.get(0);
checkResponse(executableStepResponse, sparkExecutable.getId(), null);
@@ -587,7 +592,7 @@ public class JobInfoServiceTest extends LogOutputTestCase {
manager.updateStageStatus(logicStep.getId(), null,
ExecutableState.SUCCEED, null, "test output");
- jobDetail = jobInfoService.getJobDetail(project, executable.getId());
+ jobDetail = jobInfoService.getJobDetail(DEFAULT_PROJECT,
executable.getId());
assertEquals(1, jobDetail.size());
executableStepResponse = jobDetail.get(0);
checkResponse(executableStepResponse, sparkExecutable.getId(), null);
@@ -626,7 +631,118 @@ public class JobInfoServiceTest extends LogOutputTestCase
{
executable.setJobType(JobTypeEnum.INC_BUILD);
manager.addJob(executable);
List<ExecutableStepResponse> result =
jobInfoService.getJobDetail("default", executable.getId());
- Assert.assertEquals(1, result.size());
+ assertEquals(1, result.size());
+ }
+
+ @Test
+ public void testGetExecutablePoByUuid() throws Exception {
+ ExecutablePO po = jobInfoDao.addJob(mockExecutablePO(false, "1"));
+
+ // found when both project and jobId match
+ ExecutablePO found =
jobInfoDao.getExecutablePoByUuidWithProject(DEFAULT_PROJECT, po.getId());
+ assertNotNull(found);
+ assertEquals(po.getId(), found.getId());
+
+ // not found when jobId does not exist
+
assertNull(jobInfoDao.getExecutablePoByUuidWithProject(DEFAULT_PROJECT,
"not_exist_job_id"));
+
+ // not found when the job belongs to another project (project scoping)
+
assertNull(jobInfoDao.getExecutablePoByUuidWithProject("not_exist_project",
po.getId()));
+ }
+
+ @Test
+ public void testGetJobDetailProjectScoped() throws Exception {
+
Mockito.doNothing().when(aclEvaluate).checkProjectOperationPermission(Mockito.anyString());
+ ExecutablePO po = jobInfoDao.addJob(mockExecutablePO(false, "1"));
+
+ // queried under the owning project -> succeeds
+ List<ExecutableStepResponse> result =
jobInfoService.getJobDetail(DEFAULT_PROJECT, po.getId());
+ assertFalse(result.isEmpty());
+
+ // same job queried under a different project -> JOB_NOT_EXIST
+ assertKylinExeption(() ->
jobInfoService.getJobDetail("not_exist_project", po.getId()), po.getId());
+ }
+
+ @Test
+ public void testListJobsEmptyProjectPermissionFilter() throws Exception {
+ mockDetailJobs(true);
+ getTestConfig().setProperty("kylin.streaming.enabled", "false");
+ ProjectInstance defaultProject =
NProjectManager.getInstance(getTestConfig()).getProject(DEFAULT_PROJECT);
+
+ // empty project + user has OPERATION permission on 'default' -> all 3
jobs are visible
+ Mockito.doReturn(Lists.newArrayList(new
UserProjectPermissionResponse(defaultProject, "OPERATION")))
+
.when(projectService).getProjectsFilterByExactMatchAndPermissionWrapperUserPermission(null,
false,
+ AclPermissionEnum.OPERATION);
+ JobFilter jobFilter = new JobFilter(Lists.newArrayList(),
Lists.newArrayList(), 4, "", "", false, null, "",
+ true);
+ List<ExecutableResponse> jobs = jobInfoService.listJobs(jobFilter, 0,
10);
+ assertEquals(3, jobs.size());
+
+ // empty project + user has no permitted project -> all jobs filtered
out
+ Mockito.doReturn(Lists.newArrayList()).when(projectService)
+
.getProjectsFilterByExactMatchAndPermissionWrapperUserPermission(null, false,
+ AclPermissionEnum.OPERATION);
+ assertTrue(jobInfoService.listJobs(jobFilter, 0, 10).isEmpty());
+
+ // empty project + user has OPERATION permission on 'default' -> all 3
jobs are visible
+ jobFilter.setProject(StringUtils.EMPTY);
+ UserProjectPermissionResponse project1 =
Mockito.mock(UserProjectPermissionResponse.class);
+ when(project1.getProject())
+ .thenReturn(ProjectInstance.create(DEFAULT_PROJECT, "UT",
StringUtils.EMPTY, Maps.newLinkedHashMap()));
+ UserProjectPermissionResponse project2 =
Mockito.mock(UserProjectPermissionResponse.class);
+ when(project2.getProject())
+ .thenReturn(ProjectInstance.create("default2", "UT",
StringUtils.EMPTY, Maps.newLinkedHashMap()));
+
when(projectService.getProjectsFilterByExactMatchAndPermissionWrapperUserPermission(null,
false,
+
AclPermissionEnum.OPERATION)).thenReturn(Lists.newArrayList(project1,
project2));
+ assertEquals(3, jobInfoService.listJobs(jobFilter, 0, 10).size());
+
+ // empty project + user has OPERATION permission on 'default3' -> all
jobs filtered out
+ when(project1.getProject())
+ .thenReturn(ProjectInstance.create("default3", "UT",
StringUtils.EMPTY, Maps.newLinkedHashMap()));
+ assertTrue(jobInfoService.listJobs(jobFilter, 0, 10).isEmpty());
+ }
+
+ @Test
+ public void testCountJobs() throws Exception {
+ mockDetailJobs(false);
+ getTestConfig().setProperty("kylin.streaming.enabled", "false");
+
+ // count all jobs in project
+ JobFilter jobFilter = new JobFilter(Lists.newArrayList(),
Lists.newArrayList(), 4, "", "", false,
+ DEFAULT_PROJECT, "", true);
+ assertEquals(3, jobInfoService.countJobs(jobFilter));
+ Mockito.verify(aclEvaluate,
Mockito.atLeastOnce()).checkProjectOperationPermission("default");
+
+ // count by status
+ jobFilter.setStatuses(Lists.newArrayList(JobStatusEnum.NEW));
+ assertEquals(2, jobInfoService.countJobs(jobFilter));
+ jobFilter.setStatuses(Lists.newArrayList(JobStatusEnum.NEW,
JobStatusEnum.FINISHED));
+ assertEquals(3, jobInfoService.countJobs(jobFilter));
+
+ // fuzzy count by job id when key can not be converted to subjects
+ jobFilter.setStatuses(Lists.newArrayList());
+ jobFilter.setKey("sparkjob1");
+ assertEquals(1, jobInfoService.countJobs(jobFilter));
+ jobFilter.setKey("not_exist_job_id");
+ assertEquals(0, jobInfoService.countJobs(jobFilter));
+
+ // empty project -> count jobs of projects with OPERATION permission
only
+ jobFilter.setKey(StringUtils.EMPTY);
+ jobFilter.setProject(StringUtils.EMPTY);
+ ProjectInstance defaultProject =
NProjectManager.getInstance(getTestConfig()).getProject(DEFAULT_PROJECT);
+ Mockito.doReturn(Lists.newArrayList(new
UserProjectPermissionResponse(defaultProject, "OPERATION")))
+
.when(projectService).getProjectsFilterByExactMatchAndPermissionWrapperUserPermission(null,
false,
+ AclPermissionEnum.OPERATION);
+ assertEquals(3, jobInfoService.countJobs(jobFilter));
+
+ // project `default1` -> all jobs filtered out
+ UserProjectPermissionResponse project1 =
Mockito.mock(UserProjectPermissionResponse.class);
+ when(project1.getProject())
+ .thenReturn(ProjectInstance.create("default1", "UT",
StringUtils.EMPTY, Maps.newLinkedHashMap()));
+ Mockito.doReturn(Lists.newArrayList(project1)).when(projectService)
+
.getProjectsFilterByExactMatchAndPermissionWrapperUserPermission(null, false,
+ AclPermissionEnum.OPERATION);
+ assertEquals(0, jobInfoService.countJobs(jobFilter));
}
@Test
@@ -638,7 +754,7 @@ public class JobInfoServiceTest extends LogOutputTestCase {
manager.addJob(executable);
jobInfoService.batchUpdateJobStatus(Lists.newArrayList(executable.getId()),
"default", "PAUSE",
Lists.newArrayList());
- Assert.assertEquals(ExecutableState.PAUSED,
manager.getJob(executable.getId()).getStatus());
+ assertEquals(ExecutableState.PAUSED,
manager.getJob(executable.getId()).getStatus());
UnitOfWork.doInTransactionWithRetry(() -> {
jobInfoService.batchUpdateJobStatus(Lists.newArrayList(executable.getId()),
"default", "RESUME",
Lists.newArrayList());
@@ -646,24 +762,24 @@ public class JobInfoServiceTest extends LogOutputTestCase
{
}, "default");
jobInfoService.batchUpdateJobStatus(Lists.newArrayList(executable.getId()),
"default", "PAUSE",
Lists.newArrayList());
- Assert.assertEquals(ExecutableState.PAUSED,
manager.getJob(executable.getId()).getStatus());
+ assertEquals(ExecutableState.PAUSED,
manager.getJob(executable.getId()).getStatus());
UnitOfWork.doInTransactionWithRetry(() -> {
jobInfoService.batchUpdateJobStatus(Lists.newArrayList(executable.getId()),
"default", "RESUME",
Lists.newArrayList("STOPPED"));
return null;
}, "default");
- Assert.assertEquals(ExecutableState.READY,
manager.getJob(executable.getId()).getStatus());
+ assertEquals(ExecutableState.READY,
manager.getJob(executable.getId()).getStatus());
UnitOfWork.doInTransactionWithRetry(() -> {
jobInfoService.batchUpdateJobStatus(Lists.newArrayList(executable.getId()),
"default", "DISCARD",
Lists.newArrayList());
return null;
}, "default");
- Assert.assertEquals(ExecutableState.DISCARDED,
manager.getJob(executable.getId()).getStatus());
-
Assert.assertNull(dsMgr.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa").getSegments().getFirstSegment());
+ assertEquals(ExecutableState.DISCARDED,
manager.getJob(executable.getId()).getStatus());
+
assertNull(dsMgr.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa").getSegments().getFirstSegment());
Mockito.doNothing().when(tableExtService).removeJobIdFromTableExt(executable.getId(),
"default");
jobInfoService.batchDropJob("default",
Lists.newArrayList(executable.getId()), Lists.newArrayList());
List<AbstractExecutable> executables = manager.getAllExecutables();
- Assert.assertFalse(executables.contains(executable));
+ assertFalse(executables.contains(executable));
}
@Test
@@ -676,7 +792,7 @@ public class JobInfoServiceTest extends LogOutputTestCase {
manager.updateJobOutput(executable.getId(), ExecutableState.PENDING,
null, null, null);
manager.updateJobOutput(executable.getId(), ExecutableState.RUNNING,
null, null, null);
manager.updateJobOutput(executable.getId(), ExecutableState.SUCCEED,
null, null, null);
- Assert.assertEquals(ExecutableState.SUCCEED, executable.getStatus());
+ assertEquals(ExecutableState.SUCCEED, executable.getStatus());
thrown.expect(KylinException.class);
thrown.expectMessage(JOB_UPDATE_STATUS_FAILED.getMsg("DISCARD",
executable.getId(), ExecutableState.SUCCEED));
jobInfoService.batchUpdateJobStatus(Lists.newArrayList(executable.getId()),
"default", "DISCARD",
@@ -698,27 +814,27 @@ public class JobInfoServiceTest extends LogOutputTestCase
{
Mockito.when(projectService.getOwnedProjects()).thenReturn(Lists.newArrayList("default"));
jobInfoService.batchUpdateJobStatus(Lists.newArrayList(executable.getId()),
"default", "PAUSE",
Lists.newArrayList());
- Assert.assertEquals(ExecutableState.PAUSED,
manager.getJob(executable.getId()).getStatus());
+ assertEquals(ExecutableState.PAUSED,
manager.getJob(executable.getId()).getStatus());
jobInfoService.batchUpdateJobStatus(Lists.newArrayList(executable.getId()),
"default", "RESUME",
Lists.newArrayList());
jobInfoService.batchUpdateJobStatus(Lists.newArrayList(executable.getId()),
"default", "PAUSE",
Lists.newArrayList());
- Assert.assertEquals(ExecutableState.PAUSED,
manager.getJob(executable.getId()).getStatus());
+ assertEquals(ExecutableState.PAUSED,
manager.getJob(executable.getId()).getStatus());
jobInfoService.batchUpdateJobStatus(Lists.newArrayList(executable.getId()),
"default", "RESUME",
Lists.newArrayList("STOPPED"));
- Assert.assertEquals(ExecutableState.READY,
manager.getJob(executable.getId()).getStatus());
+ assertEquals(ExecutableState.READY,
manager.getJob(executable.getId()).getStatus());
jobInfoService.batchUpdateJobStatus(Lists.newArrayList(executable.getId()),
"default", "DISCARD",
Lists.newArrayList());
- Assert.assertEquals(ExecutableState.DISCARDED,
manager.getJob(executable.getId()).getStatus());
+ assertEquals(ExecutableState.DISCARDED,
manager.getJob(executable.getId()).getStatus());
-
Assert.assertNull(dsMgr.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa").getSegments().getFirstSegment());
+
assertNull(dsMgr.getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa").getSegments().getFirstSegment());
Mockito.doNothing().when(tableExtService).removeJobIdFromTableExt(executable.getId(),
"default");
jobInfoService.batchDropGlobalJob(Lists.newArrayList(executable.getId()),
Lists.newArrayList());
- Assert.assertFalse(manager.getAllExecutables().contains(executable));
+ assertFalse(manager.getAllExecutables().contains(executable));
}
@Test
@@ -770,7 +886,7 @@ public class JobInfoServiceTest extends LogOutputTestCase {
sampleLog = sampleData.toString();
}
String[] actualLines =
StringUtils.splitByWholeSeparatorPreserveAllTokens(sampleLog, "\n");
- Assert.assertTrue(Arrays.deepEquals(exceptLines, actualLines));
+ assertTrue(Arrays.deepEquals(exceptLines, actualLines));
}
public void testFusionModelStopBatchJob() {
@@ -796,7 +912,7 @@ public class JobInfoServiceTest extends LogOutputTestCase {
return null;
}, project);
AbstractExecutable job = manager.getJob(executable.getId());
- Assert.assertEquals(ExecutableState.DISCARDED, job.getStatus());
+ assertEquals(ExecutableState.DISCARDED, job.getStatus());
// test no fusion model
String table2 = "SSB.DATES";
@@ -809,26 +925,26 @@ public class JobInfoServiceTest extends LogOutputTestCase
{
@Test
public void testKillExistApplication() {
- ExecutableManager manager =
ExecutableManager.getInstance(jobInfoService.getConfig(), getProject());
+ ExecutableManager manager =
ExecutableManager.getInstance(jobInfoService.getConfig(), DEFAULT_PROJECT);
SucceedChainedTestExecutable executable = new
SucceedChainedTestExecutable();
- executable.setProject(getProject());
+ executable.setProject(DEFAULT_PROJECT);
addSegment(executable);
val task = new NSparkExecutable();
- task.setProject(getProject());
+ task.setProject(DEFAULT_PROJECT);
addSegment(task);
executable.addTask(task);
executable.setJobType(JobTypeEnum.INC_BUILD);
manager.addJob(executable);
jobInfoService.killExistApplication(executable);
- jobInfoService.killExistApplication(getProject(), executable.getId());
+ jobInfoService.killExistApplication(DEFAULT_PROJECT,
executable.getId());
}
@Test
public void testSetExceptionResolveAndCode() {
- val manager =
ExecutableManager.getInstance(jobInfoService.getConfig(), project);
+ val manager =
ExecutableManager.getInstance(jobInfoService.getConfig(), DEFAULT_PROJECT);
val executable = new SucceedChainedTestExecutable();
- executable.setProject(project);
+ executable.setProject(DEFAULT_PROJECT);
executable.setId(RandomUtil.randomUUIDStr());
executable.setTargetSubject("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
executable.setJobType(JobTypeEnum.INC_BUILD);
@@ -839,46 +955,46 @@ public class JobInfoServiceTest extends LogOutputTestCase
{
var failedSegmentId = RandomUtil.randomUUIDStr();
var failedStack = ExceptionUtils.getStackTrace(new
NoRetryException("date format not match"));
var failedReason = "date format not match";
- jobInfoService.updateJobError(project, jobId, failedStepId,
failedSegmentId, failedStack, failedReason);
+ jobInfoService.updateJobError(DEFAULT_PROJECT, jobId, failedStepId,
failedSegmentId, failedStack, failedReason);
ExecutableStepResponse executableStepResponse = new
ExecutableStepResponse();
jobInfoService.setExceptionResolveAndCodeAndReason(executable.getOutput(),
executableStepResponse);
-
Assert.assertEquals(JobExceptionResolve.JOB_DATE_FORMAT_NOT_MATCH_ERROR.toExceptionResolve().getResolve(),
+
assertEquals(JobExceptionResolve.JOB_DATE_FORMAT_NOT_MATCH_ERROR.toExceptionResolve().getResolve(),
executableStepResponse.getFailedResolve());
-
Assert.assertEquals(JobErrorCode.JOB_DATE_FORMAT_NOT_MATCH_ERROR.toErrorCode().getLocalizedString(),
+
assertEquals(JobErrorCode.JOB_DATE_FORMAT_NOT_MATCH_ERROR.toErrorCode().getLocalizedString(),
executableStepResponse.getFailedCode());
-
Assert.assertEquals(JobExceptionReason.JOB_DATE_FORMAT_NOT_MATCH_ERROR.toExceptionReason().getReason(),
+
assertEquals(JobExceptionReason.JOB_DATE_FORMAT_NOT_MATCH_ERROR.toExceptionReason().getReason(),
executableStepResponse.getFailedReason());
ErrorCode.setMsg("en");
ExceptionResolve.setLang("en");
jobInfoService.setExceptionResolveAndCodeAndReason(executable.getOutput(),
executableStepResponse);
-
Assert.assertEquals(JobExceptionResolve.JOB_DATE_FORMAT_NOT_MATCH_ERROR.toExceptionResolve().getResolve(),
+
assertEquals(JobExceptionResolve.JOB_DATE_FORMAT_NOT_MATCH_ERROR.toExceptionResolve().getResolve(),
executableStepResponse.getFailedResolve());
-
Assert.assertEquals(JobErrorCode.JOB_DATE_FORMAT_NOT_MATCH_ERROR.toErrorCode().getLocalizedString(),
+
assertEquals(JobErrorCode.JOB_DATE_FORMAT_NOT_MATCH_ERROR.toErrorCode().getLocalizedString(),
executableStepResponse.getFailedCode());
-
Assert.assertEquals(JobExceptionReason.JOB_DATE_FORMAT_NOT_MATCH_ERROR.toExceptionReason().getReason(),
+
assertEquals(JobExceptionReason.JOB_DATE_FORMAT_NOT_MATCH_ERROR.toExceptionReason().getReason(),
executableStepResponse.getFailedReason());
// test default reason / code / resolve
manager.updateJobError(jobId, null, null, null, null);
- jobInfoService.updateJobError(project, jobId, failedStepId,
failedSegmentId, failedStack, "test");
+ jobInfoService.updateJobError(DEFAULT_PROJECT, jobId, failedStepId,
failedSegmentId, failedStack, "test");
jobInfoService.setExceptionResolveAndCodeAndReason(executable.getOutput(),
executableStepResponse);
-
Assert.assertEquals(JobExceptionResolve.JOB_BUILDING_ERROR.toExceptionResolve().getResolve(),
+
assertEquals(JobExceptionResolve.JOB_BUILDING_ERROR.toExceptionResolve().getResolve(),
executableStepResponse.getFailedResolve());
-
Assert.assertEquals(JobErrorCode.JOB_BUILDING_ERROR.toErrorCode().getLocalizedString(),
+
assertEquals(JobErrorCode.JOB_BUILDING_ERROR.toErrorCode().getLocalizedString(),
executableStepResponse.getFailedCode());
-
Assert.assertEquals(JobExceptionReason.JOB_BUILDING_ERROR.toExceptionReason().getReason()
+ ": test",
+
assertEquals(JobExceptionReason.JOB_BUILDING_ERROR.toExceptionReason().getReason()
+ ": test",
executableStepResponse.getFailedReason());
ErrorCode.setMsg("en");
ExceptionResolve.setLang("en");
jobInfoService.setExceptionResolveAndCodeAndReason(executable.getOutput(),
executableStepResponse);
-
Assert.assertEquals(JobExceptionResolve.JOB_BUILDING_ERROR.toExceptionResolve().getResolve(),
+
assertEquals(JobExceptionResolve.JOB_BUILDING_ERROR.toExceptionResolve().getResolve(),
executableStepResponse.getFailedResolve());
-
Assert.assertEquals(JobErrorCode.JOB_BUILDING_ERROR.toErrorCode().getLocalizedString(),
+
assertEquals(JobErrorCode.JOB_BUILDING_ERROR.toErrorCode().getLocalizedString(),
executableStepResponse.getFailedCode());
-
Assert.assertEquals(JobExceptionReason.JOB_BUILDING_ERROR.toExceptionReason().getReason()
+ ": test",
+
assertEquals(JobExceptionReason.JOB_BUILDING_ERROR.toExceptionReason().getReason()
+ ": test",
executableStepResponse.getFailedReason());
}
@@ -904,20 +1020,21 @@ public class JobInfoServiceTest extends
LogOutputTestCase {
@Test
public void testDiscardJobAndNotify() {
- ExecutableManager manager =
ExecutableManager.getInstance(getTestConfig(), project);
+ ExecutableManager manager =
ExecutableManager.getInstance(getTestConfig(), DEFAULT_PROJECT);
val job = new DefaultExecutable();
- job.setProject(project);
+ job.setProject(DEFAULT_PROJECT);
job.setJobType(JobTypeEnum.INC_BUILD);
manager.addJob(job);
overwriteSystemProp("kylin.job.notification-enabled", "true");
UnitOfWork.doInTransactionWithRetry(() -> {
- jobInfoService.updateJobStatus(job.getId(),
ExecutableManager.toPO(job, project), project, "DISCARD");
+ jobInfoService.updateJobStatus(job.getId(),
ExecutableManager.toPO(job, DEFAULT_PROJECT), DEFAULT_PROJECT,
+ "DISCARD");
return null;
- }, project);
+ }, DEFAULT_PROJECT);
- Assert.assertTrue(containsLog("[Job Discarded] is not specified by
user, not need to notify users."));
+ assertTrue(containsLog("[Job Discarded] is not specified by user, not
need to notify users."));
}
@Test
@@ -951,18 +1068,18 @@ public class JobInfoServiceTest extends
LogOutputTestCase {
List<ExecutableResponse> jobs = jobInfoService.listJobs(jobFilter);
List<ExecutableResponse> executableResponses =
jobInfoService.addOldParams(jobs);
ExecutableResponse executable = executableResponses.get(0);
- Assert.assertEquals("", executable.getRelatedSegment());
- Assert.assertEquals(0, executable.getProgress(), 0);
+ assertEquals("", executable.getRelatedSegment());
+ assertEquals(0, executable.getProgress(), 0);
executable.getSteps().get(0).setStatus(JobStatusEnum.FINISHED);
- Assert.assertEquals(33, executable.getProgress(), 1);
+ assertEquals(33, executable.getProgress(), 1);
executable.setSteps(null);
String uuid = UUID.randomUUID().toString();
executable.setTargetSegments(Lists.newArrayList(uuid));
- Assert.assertEquals(0.0, executable.getProgress(), 0);
- Assert.assertEquals(uuid, executable.getRelatedSegment());
+ assertEquals(0.0, executable.getProgress(), 0);
+ assertEquals(uuid, executable.getRelatedSegment());
executable.setTargetSegments(Collections.emptyList());
- Assert.assertEquals(0.0, executable.getProgress(), 0);
- Assert.assertEquals("", executable.getRelatedSegment());
+ assertEquals(0.0, executable.getProgress(), 0);
+ assertEquals("", executable.getRelatedSegment());
}
@Test
@@ -971,19 +1088,19 @@ public class JobInfoServiceTest extends
LogOutputTestCase {
task.setProject("default");
ExecutableState jobState = ExecutableState.RUNNING;
ExecutableStepResponse result =
jobInfoService.parseToExecutableStep(task, null, new HashMap<>(), jobState);
- Assert.assertSame(PENDING, result.getStatus());
+ assertSame(PENDING, result.getStatus());
}
@Test
public void testJobDiscard() {
ExecutableManager executableManager =
Mockito.mock(ExecutableManager.class);
- Mockito.when(jobInfoService.getManager(ExecutableManager.class,
project)).thenReturn(executableManager);
+ Mockito.when(jobInfoService.getManager(ExecutableManager.class,
DEFAULT_PROJECT)).thenReturn(executableManager);
Mockito.doAnswer(invocation -> {
// ensure unit of work transaction
- Assert.assertNotNull(UnitOfWork.get());
+ assertNotNull(UnitOfWork.get());
return null;
}).when(executableManager).discardJob(Mockito.any());
- jobInfoService.discardJobs(project, Lists.newArrayList("job1",
"job2"));
+ jobInfoService.discardJobs(DEFAULT_PROJECT, Lists.newArrayList("job1",
"job2"));
Mockito.verify(executableManager, Mockito.times(1)).discardJob("job1");
Mockito.verify(executableManager, Mockito.times(1)).discardJob("job2");
}
@@ -991,13 +1108,13 @@ public class JobInfoServiceTest extends
LogOutputTestCase {
@Test
public void testSuicideJobOfModel() {
ExecutableManager executableManager =
Mockito.mock(ExecutableManager.class);
- Mockito.when(jobInfoService.getManager(ExecutableManager.class,
project)).thenReturn(executableManager);
+ Mockito.when(jobInfoService.getManager(ExecutableManager.class,
DEFAULT_PROJECT)).thenReturn(executableManager);
Mockito.doAnswer(invocation -> {
// ensure unit of work transaction
- Assert.assertNotNull(UnitOfWork.get());
+ assertNotNull(UnitOfWork.get());
return null;
}).when(executableManager).checkSuicideJobOfModel(Mockito.any(),
Mockito.anyString());
- jobInfoService.checkSuicideJobOfModel(project, "test");
- Mockito.verify(executableManager,
Mockito.times(1)).checkSuicideJobOfModel(project, "test");
+ jobInfoService.checkSuicideJobOfModel(DEFAULT_PROJECT, "test");
+ Mockito.verify(executableManager,
Mockito.times(1)).checkSuicideJobOfModel(DEFAULT_PROJECT, "test");
}
}
diff --git
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/util/JobFilterUtilTest.java
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/util/JobFilterUtilTest.java
new file mode 100644
index 0000000000..412be07cb0
--- /dev/null
+++
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/util/JobFilterUtilTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.util;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import org.apache.kylin.job.rest.JobFilter;
+import org.apache.kylin.job.rest.JobMapperFilter;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.rest.response.UserProjectPermissionResponse;
+import org.apache.kylin.rest.security.AclPermissionEnum;
+import org.apache.kylin.rest.service.ModelService;
+import org.apache.kylin.rest.service.ProjectService;
+import org.apache.kylin.rest.service.TableExtService;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class JobFilterUtilTest {
+
+ // JobTimeFilterEnum.ALL -> avoids relative-date computation in
getQueryStartTime
+ private static final int TIME_FILTER_ALL = 4;
+
+ private ModelService modelService;
+ private TableExtService tableExtService;
+ private ProjectService projectService;
+
+ @Before
+ public void setUp() {
+ modelService = Mockito.mock(ModelService.class);
+ tableExtService = Mockito.mock(TableExtService.class);
+ projectService = Mockito.mock(ProjectService.class);
+ }
+
+ private JobFilter newJobFilter(String project, boolean reverse) {
+ return new JobFilter(Lists.newArrayList(), Lists.newArrayList(),
TIME_FILTER_ALL, "", "", false, project, "",
+ reverse);
+ }
+
+ private UserProjectPermissionResponse permissionResponse(String
projectName) {
+ ProjectInstance instance = new ProjectInstance();
+ instance.setName(projectName);
+ return new UserProjectPermissionResponse(instance,
AclPermissionEnum.OPERATION.name());
+ }
+
+ @Test
+ public void testGetJobMapperFilterEmptyProjectPopulatesProjects() throws
Exception {
+
Mockito.when(projectService.getProjectsFilterByExactMatchAndPermissionWrapperUserPermission(null,
false,
+ AclPermissionEnum.OPERATION))
+ .thenReturn(Lists.newArrayList(permissionResponse("p1"),
permissionResponse("p2")));
+
+ JobMapperFilter filter =
JobFilterUtil.getJobMapperFilter(newJobFilter(null, true), 0, 10, modelService,
+ tableExtService, projectService);
+
+ Assert.assertEquals(Lists.newArrayList("p1", "p2"),
filter.getProjects());
+ Assert.assertTrue(StringUtils.isEmpty(filter.getProject()));
+ Mockito.verify(projectService, Mockito.times(1))
+
.getProjectsFilterByExactMatchAndPermissionWrapperUserPermission(null, false,
+ AclPermissionEnum.OPERATION);
+ }
+
+ @Test
+ public void testGetJobMapperFilterEmptyProjectNoPermittedProjects() throws
Exception {
+
Mockito.when(projectService.getProjectsFilterByExactMatchAndPermissionWrapperUserPermission(null,
false,
+ AclPermissionEnum.OPERATION)).thenReturn(Lists.newArrayList());
+
+ JobMapperFilter filter =
JobFilterUtil.getJobMapperFilter(newJobFilter("", true), 0, 10, modelService,
+ tableExtService, projectService);
+
+ Assert.assertNotNull(filter.getProjects());
+ Assert.assertTrue(filter.getProjects().isEmpty());
+ }
+
+ @Test
+ public void testGetJobMapperFilterWithProjectKeepsProjectsNull() throws
Exception {
+ JobMapperFilter filter =
JobFilterUtil.getJobMapperFilter(newJobFilter("default", true), 5, 20,
modelService,
+ tableExtService, projectService);
+
+ Assert.assertEquals("default", filter.getProject());
+ // single-project branch must not populate the multi-project list
+ Assert.assertNull(filter.getProjects());
+ Assert.assertEquals(5, filter.getOffset());
+ Assert.assertEquals(20, filter.getLimit());
+ Mockito.verify(projectService,
Mockito.never()).getProjectsFilterByExactMatchAndPermissionWrapperUserPermission(
+ Mockito.any(), Mockito.anyBoolean(), Mockito.any());
+ }
+
+ @Test
+ public void testGetJobMapperFilterOrderType() throws Exception {
+ JobMapperFilter reversed =
JobFilterUtil.getJobMapperFilter(newJobFilter("default", true), 0, 10,
modelService,
+ tableExtService, projectService);
+ Assert.assertEquals("DESC", reversed.getOrderType());
+
+ JobMapperFilter ascending =
JobFilterUtil.getJobMapperFilter(newJobFilter("default", false), 0, 10,
+ modelService, tableExtService, projectService);
+ Assert.assertEquals("ASC", ascending.getOrderType());
+ }
+}