This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new aef2fbf36c [Python] Migrate exists method call in PythonGateway to
corresponding service (#11188)
aef2fbf36c is described below
commit aef2fbf36c76d4dbfcf566ec959acea00e7bec4e
Author: 陈家名 <[email protected]>
AuthorDate: Thu Aug 4 12:50:21 2022 +0800
[Python] Migrate exists method call in PythonGateway to corresponding
service (#11188)
Migrate all exists method call in PythonGateway to corresponding service
---
.../dolphinscheduler/api/python/PythonGateway.java | 16 ++--------
.../api/service/ResourcesService.java | 10 ++++++
.../api/service/impl/ResourcesServiceImpl.java | 11 +++++++
.../api/python/PythonGatewayTest.java | 18 ++---------
.../api/service/ResourcesServiceTest.java | 18 +++++++++++
.../pydolphinscheduler/core/process_definition.py | 11 +++----
.../src/pydolphinscheduler/core/resource.py | 36 ++++++++++++++++++++--
.../src/pydolphinscheduler/core/task.py | 29 +++++++++--------
.../tests/core/test_resource_definition.py | 36 ++++++++++++++++++++--
.../pydolphinscheduler/tests/core/test_task.py | 22 +++++++++----
10 files changed, 148 insertions(+), 59 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
index 57e289137a..d0489cce62 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
@@ -24,7 +24,6 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
@@ -572,19 +571,8 @@ public class PythonGateway {
* @param userName user who query resource
* @param fullName full name of the resource
*/
- public Map<String, Object> queryResourcesFileInfo(String userName, String
fullName) {
- Map<String, Object> result = new HashMap<>();
- User user = usersService.queryUser(userName);
- Result<Object> resourceResponse = resourceService.queryResource(user,
fullName, null, ResourceType.FILE);
- if (resourceResponse.getCode() != Status.SUCCESS.getCode()) {
- String msg = String.format("Can not find valid resource by name
%s", fullName);
- logger.error(msg);
- throw new IllegalArgumentException(msg);
- }
- Resource resource = (Resource) resourceResponse.getData();
- result.put("id", resource.getId());
- result.put("name", resource.getFullName());
- return result;
+ public Resource queryResourcesFileInfo(String userName, String fullName) {
+ return resourceService.queryResourcesFileInfo(userName, fullName);
}
/**
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
index fb0fee5e17..00c6fa151b 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.ProgramType;
+import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
@@ -219,6 +220,15 @@ public interface ResourcesService {
*/
Map<String, Object> authorizeResourceTree(User loginUser, Integer userId);
+ /**
+ * Get resource by given resource type and full name.
+ * Useful in Python API create task which need processDefinition
information.
+ *
+ * @param userName user who query resource
+ * @param fullName full name of the resource
+ */
+ Resource queryResourcesFileInfo(String userName, String fullName);
+
/**
* unauthorized file
*
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
index 55fd2f275f..16e230a705 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
@@ -1448,6 +1448,17 @@ public class ResourcesServiceImpl extends
BaseServiceImpl implements ResourcesSe
return result;
}
+ @Override
+ public Resource queryResourcesFileInfo(String userName, String fullName) {
+ User user = userMapper.queryByUserNameAccurately(userName);
+ Result<Object> resourceResponse = this.queryResource(user, fullName,
null, ResourceType.FILE);
+ if (resourceResponse.getCode() != Status.SUCCESS.getCode()) {
+ String msg = String.format("Can not find valid resource by name
%s", fullName);
+ throw new IllegalArgumentException(msg);
+ }
+ return (Resource) resourceResponse.getData();
+ }
+
/**
* unauthorized file
*
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java
index 022eb344a5..2a49ed307b 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java
@@ -17,10 +17,7 @@
package org.apache.dolphinscheduler.api.python;
-import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.ResourcesService;
-import org.apache.dolphinscheduler.api.service.UsersService;
-import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
@@ -63,9 +60,6 @@ public class PythonGatewayTest {
@Mock
private ResourcesService resourcesService;
- @Mock
- private UsersService usersService;
-
@Test
public void testGetCodeAndVersion() throws
CodeGenerateUtils.CodeGenerateException {
Project project = getTestProject();
@@ -120,16 +114,10 @@ public class PythonGatewayTest {
@Test
public void testQueryResourcesFileInfo() {
User user = getTestUser();
-
Mockito.when(usersService.queryUser(user.getUserName())).thenReturn(user);
-
- Result<Object> mockResult = new Result<>();
- mockResult.setCode(Status.SUCCESS.getCode());
Resource resource = getTestResource();
- mockResult.setData(resource);
- Mockito.when(resourcesService.queryResource(user,
resource.getFullName(), null, ResourceType.FILE)).thenReturn(mockResult);
-
- Map<String, Object> result =
pythonGateway.queryResourcesFileInfo(user.getUserName(),
resource.getFullName());
- Assert.assertEquals((int) result.get("id"), resource.getId());
+
Mockito.when(resourcesService.queryResourcesFileInfo(user.getUserName(),
resource.getFullName())).thenReturn(resource);
+ Resource result =
pythonGateway.queryResourcesFileInfo(user.getUserName(),
resource.getFullName());
+ Assert.assertEquals(result.getId(), resource.getId());
}
private Resource getTestResource() {
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
index 961d5f7763..1579210064 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
@@ -716,6 +716,24 @@ public class ResourcesServiceTest {
Assert.assertEquals(Status.SUCCESS.getMsg(), result.getMsg());
}
+ @Test
+ public void testQueryResourcesFileInfo() {
+ User user = getUser();
+ String userName = "test-user";
+
Mockito.when(userMapper.queryByUserNameAccurately(userName)).thenReturn(user);
+ Resource file = new Resource();
+ file.setFullName("/dir/file1.py");
+ file.setId(1);
+ Mockito.when(resourcesMapper.queryResource(file.getFullName(),
ResourceType.FILE.ordinal()))
+ .thenReturn(Collections.singletonList(file));
+
PowerMockito.when(resourcePermissionCheckService.operationPermissionCheck(
+ AuthorizationType.RESOURCE_FILE_ID, null, user.getId(),
ApiFuncIdentificationConstant.FILE_VIEW, serviceLogger)).thenReturn(true);
+
PowerMockito.when(resourcePermissionCheckService.resourcePermissionCheck(
+ AuthorizationType.RESOURCE_FILE_ID, new
Object[]{file.getId()}, user.getId(), serviceLogger)).thenReturn(true);
+ Resource result = resourcesService.queryResourcesFileInfo(userName,
file.getFullName());
+ Assert.assertEquals(file.getFullName(), result.getFullName());
+ }
+
@Test
public void testUpdateResourceContent() {
PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false);
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
index 31d1e6124b..1740beafdf 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
@@ -23,6 +23,7 @@ from typing import Any, Dict, List, Optional, Set
from pydolphinscheduler import configuration
from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.resource import Resource
from pydolphinscheduler.exceptions import PyDSParamException,
PyDSTaskNoFoundException
from pydolphinscheduler.java_gateway import launch_gateway
from pydolphinscheduler.models import Base, Project, Tenant, User
@@ -110,7 +111,7 @@ class ProcessDefinition(Base):
timeout: Optional[int] = 0,
release_state: Optional[str] = configuration.WORKFLOW_RELEASE_STATE,
param: Optional[Dict] = None,
- resource_list: Optional[List] = None,
+ resource_list: Optional[List[Resource]] = None,
):
super().__init__(name, description)
self.schedule = schedule
@@ -414,12 +415,8 @@ class ProcessDefinition(Base):
)
if len(self.resource_list) > 0:
for res in self.resource_list:
- gateway.entry_point.createOrUpdateResource(
- self._user,
- res.name,
- res.description,
- res.content,
- )
+ res.user_name = self._user
+ res.create_or_update_resource()
return self._process_definition_code
def start(self) -> None:
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource.py
index a3aab81d17..8015a72e39 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/resource.py
@@ -19,6 +19,8 @@
from typing import Optional
+from pydolphinscheduler.exceptions import PyDSParamException
+from pydolphinscheduler.java_gateway import launch_gateway
from pydolphinscheduler.models import Base
@@ -28,16 +30,46 @@ class Resource(Base):
:param name: The fullname of resource.Includes path and suffix.
:param content: The description of resource.
:param description: The description of resource.
+ :param user_name: The user name of resource.
"""
- _DEFINE_ATTR = {"name", "content", "description"}
+ _DEFINE_ATTR = {"name", "content", "description", "user_name"}
def __init__(
self,
name: str,
- content: str,
+ content: Optional[str] = None,
description: Optional[str] = None,
+ user_name: Optional[str] = None,
):
super().__init__(name, description)
self.content = content
+ self.user_name = user_name
self._resource_code = None
+
+ def get_info_from_database(self):
+ """Get resource info from java gateway, contains resource id, name."""
+ if not self.user_name:
+ raise PyDSParamException(
+ "`user_name` is required when querying resources from python
gate."
+ )
+ gateway = launch_gateway()
+ return gateway.entry_point.queryResourcesFileInfo(self.user_name,
self.name)
+
+ def get_id_from_database(self):
+ """Get resource id from java gateway."""
+ return self.get_info_from_database().getId()
+
+ def create_or_update_resource(self):
+ """Create or update resource via java gateway."""
+ if not self.content or not self.user_name:
+ raise PyDSParamException(
+ "`user_name` and `content` are required when create or update
resource from python gate."
+ )
+ gateway = launch_gateway()
+ gateway.entry_point.createOrUpdateResource(
+ self.user_name,
+ self.name,
+ self.description,
+ self.content,
+ )
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
index 4d4e67e323..b866ea221f 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
@@ -32,6 +32,8 @@ from pydolphinscheduler.core.process_definition import (
ProcessDefinition,
ProcessDefinitionContext,
)
+from pydolphinscheduler.core.resource import Resource
+from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.java_gateway import launch_gateway
from pydolphinscheduler.models import Base
@@ -175,18 +177,28 @@ class Task(Base):
def resource_list(self) -> List:
"""Get task define attribute `resource_list`."""
resources = set()
- for resource in self._resource_list:
- if type(resource) == str:
-
resources.add(self.query_resource(resource).get(ResourceKey.ID))
- elif type(resource) == dict and resource.get(ResourceKey.ID) is
not None:
+ for res in self._resource_list:
+ if type(res) == str:
+ resources.add(
+ Resource(name=res,
user_name=self.user_name).get_id_from_database()
+ )
+ elif type(res) == dict and res.get(ResourceKey.ID) is not None:
logger.warning(
"""`resource_list` should be defined using List[str] with
resource paths,
the use of ids to define resources will be remove in
version 3.2.0.
"""
)
- resources.add(resource.get(ResourceKey.ID))
+ resources.add(res.get(ResourceKey.ID))
return [{ResourceKey.ID: r} for r in resources]
+ @property
+ def user_name(self) -> Optional[str]:
+ """Return user name of process definition."""
+ if self.process_definition:
+ return self.process_definition.user.name
+ else:
+ raise PyDSParamException("`user_name` cannot be empty.")
+
@property
def condition_result(self) -> Dict:
"""Get attribute condition_result."""
@@ -295,10 +307,3 @@ class Task(Base):
# result =
gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
# gateway_result_checker(result)
return result.get("code"), result.get("version")
-
- def query_resource(self, full_name):
- """Get resource info from java gateway, contains resource id, name."""
- gateway = launch_gateway()
- return gateway.entry_point.queryResourcesFileInfo(
- self.process_definition.user.name, full_name
- )
diff --git
a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_resource_definition.py
b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_resource_definition.py
index ebfb8936f8..07fcac3547 100644
---
a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_resource_definition.py
+++
b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_resource_definition.py
@@ -16,8 +16,10 @@
# under the License.
"""Test resource definition."""
+import pytest
from pydolphinscheduler.core.resource import Resource
+from pydolphinscheduler.exceptions import PyDSParamException
def test_resource():
@@ -25,14 +27,42 @@ def test_resource():
name = "/dev/test.py"
content = """print("hello world")"""
description = "hello world"
+ user_name = "test_user"
expect = {
"name": name,
"content": content,
"description": description,
+ "userName": user_name,
}
resourceDefinition = Resource(
- name=name,
- content=content,
- description=description,
+ name=name, content=content, description=description,
user_name=user_name
)
assert resourceDefinition.get_define() == expect
+
+
+def test_empty_user_name():
+ """Tests for the exception get info from database when the user name is
null."""
+ name = "/dev/test.py"
+ content = """print("hello world")"""
+ description = "hello world"
+ resourceDefinition = Resource(name=name, content=content,
description=description)
+ with pytest.raises(
+ PyDSParamException,
+ match="`user_name` is required when querying resources from python
gate.",
+ ):
+ resourceDefinition.get_info_from_database()
+
+
+def test_empty_content():
+ """Tests for the exception create or update resource when the user name or
content is empty."""
+ name = "/dev/test.py"
+ user_name = "test_user"
+ description = "hello world"
+ resourceDefinition = Resource(
+ name=name, description=description, user_name=user_name
+ )
+ with pytest.raises(
+ PyDSParamException,
+ match="`user_name` and `content` are required when create or update
resource from python gate.",
+ ):
+ resourceDefinition.create_or_update_resource()
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
index 65555c1eb5..201bdb30cd 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
@@ -63,10 +63,14 @@ TEST_TASK_RELATION_SIZE = 0
],
)
@patch(
- "pydolphinscheduler.core.task.Task.query_resource",
- return_value=({"id": 1, "name": "foo"}),
+ "pydolphinscheduler.core.resource.Resource.get_id_from_database",
+ return_value=1,
)
-def test_property_task_params(mock_resource, attr, expect):
+@patch(
+ "pydolphinscheduler.core.task.Task.user_name",
+ return_value="test_user",
+)
+def test_property_task_params(mock_resource, mock_user_name, attr, expect):
"""Test class task property."""
task = testTask(
"test-property-task-params",
@@ -265,10 +269,16 @@ def test_add_duplicate(caplog):
return_value=(123, 1),
)
@patch(
- "pydolphinscheduler.core.task.Task.query_resource",
- return_value=({"id": 1, "name": "/dev/test.py"}),
+ "pydolphinscheduler.core.resource.Resource.get_id_from_database",
+ return_value=1,
+)
+@patch(
+ "pydolphinscheduler.core.task.Task.user_name",
+ return_value="test_user",
)
-def test_python_resource_list(mock_code_version, mock_resource, resources,
expect):
+def test_python_resource_list(
+ mock_code_version, mock_resource, mock_user_name, resources, expect
+):
"""Test python task resource list."""
task = Task(
name="python_resource_list.",