This is an automated email from the ASF dual-hosted git repository.
xiaozhenliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 83076ac501 feat(amber): enable user system by default (#3782)
83076ac501 is described below
commit 83076ac5010b10269f77485e935a4a69d0fe4577
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Tue Oct 7 16:26:22 2025 -0700
feat(amber): enable user system by default (#3782)
## Purpose
This PR sets user system to be enabled by default in the configuration.
Currently, this flag is by default set to be disabled (a.k.a. the
non-user mode). As no one is using the non-user mode and we are
requiring all the developers to enable the user system, we have decided
to abandon the non-user mode.
## Challenge & Design
The major blocker of setting the flag to be enabled by default is two
e2e test suites that rely on the non-user mode. These two test suites
execute a workflow in the Amber engine in each of their test cases.
Enabling the user mode would require texera_db in the test environment,
as in the user-system mode, the execution of a workflow requires an
`eid` (and subsequently a `vid`, `wid`, and `uid`) in `texera_db`.
We could use `MockTexeraDB`, which is currently used by many unit tests.
`MockTexeraDB` creates an embedded postgres instance per test suite, and
the embedded db is destroyed at the end of each such test suite.
However, a complexity of the two e2e test cases is they both access a
singleton resource `WorkflowExecutionsResource`, which caches the DSL
context from `SqlServer` (i.e., it only gets evaluated once per JVM):
```
final private lazy val context = SqlServer
.getInstance()
.createDSLContext()
```
In fact, most of the singleton resources in our current codebase cache
the `DSLContext` / Dao, as the `DSLContext` never gets updated during
the real Texera environment (i.e., the real`texera_db`'s address never
changes).
In the test environment, however, when working with `MockTexeraDB`, that
assumption does not hold, as each instance of `MockTexeraDB` has a
different address, and gets destroyed before other test suite runs.
Since all the test suites are executed in the same JVM during CI run,
using `MockTexeraDB` would cause the 2nd of the two e2e test cases to
fail because it still uses the DSL context from the 1st test suite's
`MockTexeraDB`.
The diagrams below show what happens when using the embedded
`MockTexeraDB` to run two e2e test suites that both need to access the
same singleton resource during their execution.
The 1st test suite creates an embedded DB (`DB1`) and lets the singleton
`SqlServer` object set its `DSLContext` to point to `DB1`. When the test
cases first access `WorkflowExecutionsResource` (`WER`), WER grabs the
`DSLContext` from `SqlServer` and caches it. `WER` then queries `DB1`
for all the test cases of test suite 1. When test suite 1 finishes,
`DB1` gets destroyed.

Later, In the same JVM, when test suite 2 starts, it also creates its
own embedded DB (`DB2`) and lets `SqlServer` point to `DB2`. However, as
the `DSLContext` in `WER` is cached, it does not get updated when the
test cases access `WER`, so `WER` still points to `DB1`, which is
already destroyed, and causes failures.

To solve this problem, we could either:
1. Avoid caching DSLContext/Dao in the codebase, or
2. Let the two e2e test cases use the same real, external database (same
as production environment) instead of `MockTexeraDB`.
**We choose the 2nd design, as these two are e2e tests which should
emulate production behavior with a real database.** To avoid polluting
the developer's local `texera_db`, we use a separate test database with
the same schema.
## Changes
- Sets `user-sys` to be enabled by default.
- Introduces a `texera_db_for_test_cases` specifically for test cases
and CIs. `texera_ddl.sql` is updated to allow creating the database with
a name other than `texera_db` (and still defaults to `texera_db`), and
CIs will automatically create `texera_db_for_test_cases` with the same
schema as `texera_db`.
- Updates `DataProcessingSpec` and `PauseSpec` to use
`texera_db_for_test_cases`. The two test suites now populate and cleanup
this database during their run.
- `MockTexeraDB` is updated to incorporate the changes to the DDL
script.
- `SqlServer` is also updated with a `clearInstance` logic so that other
unit tests that use `MockTexeraDB` can clear their instance in
`SqlServer` properly so that they do not interfere with the two e2e
tests.
## Next Step
Remove the `user-sys`'s`enabled` flag and its `if-else` handling logic
completely.
---------
Co-authored-by: Xinyuan Lin <[email protected]>
---
.github/workflows/github-action-build.yml | 4 +
.../ics/amber/engine/e2e/DataProcessingSpec.scala | 51 +++++++------
.../edu/uci/ics/amber/engine/e2e/PauseSpec.scala | 19 ++++-
.../edu/uci/ics/amber/engine/e2e/TestUtils.scala | 89 ++++++++++++++++++++++
core/config/src/main/resources/storage.conf | 6 ++
core/config/src/main/resources/user-system.conf | 2 +-
.../edu/uci/ics/amber/config/StorageConfig.scala | 1 +
.../scala/edu/uci/ics/texera/dao/SqlServer.scala | 4 +
.../edu/uci/ics/texera/dao/MockTexeraDB.scala | 8 +-
core/scripts/sql/texera_ddl.sql | 17 ++++-
10 files changed, 169 insertions(+), 32 deletions(-)
diff --git a/.github/workflows/github-action-build.yml
b/.github/workflows/github-action-build.yml
index e9f0148cd8..ad8b3cc391 100644
--- a/.github/workflows/github-action-build.yml
+++ b/.github/workflows/github-action-build.yml
@@ -115,6 +115,10 @@ jobs:
psql -h localhost -U postgres -f core/scripts/sql/texera_lakefs.sql
env:
PGPASSWORD: postgres
+ - name: Create texera_db_for_test_cases
+ run: psql -h localhost -U postgres -v DB_NAME=texera_db_for_test_cases
-f core/scripts/sql/texera_ddl.sql
+ env:
+ PGPASSWORD: postgres
- name: Compile with sbt
run: cd core && sbt clean package
- name: Run backend tests
diff --git
a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala
b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala
index 2cf6e53feb..4d3a6ad9d7 100644
---
a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala
+++
b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/DataProcessingSpec.scala
@@ -22,23 +22,27 @@ package edu.uci.ics.amber.engine.e2e
import akka.actor.{ActorSystem, Props}
import akka.testkit.{ImplicitSender, TestKit}
import akka.util.Timeout
-import ch.vorburger.mariadb4j.DB
import com.twitter.util.{Await, Duration, Promise}
import edu.uci.ics.amber.clustering.SingleNodeListener
+import edu.uci.ics.amber.core.storage.DocumentFactory
import edu.uci.ics.amber.core.storage.model.VirtualDocument
-import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping
-import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory}
import edu.uci.ics.amber.core.tuple.{AttributeType, Tuple}
-import edu.uci.ics.amber.core.virtualidentity.{OperatorIdentity,
PhysicalOpIdentity}
-import edu.uci.ics.amber.core.workflow.{GlobalPortIdentity, PortIdentity,
WorkflowContext}
+import edu.uci.ics.amber.core.virtualidentity.OperatorIdentity
+import edu.uci.ics.amber.core.workflow.{PortIdentity, WorkflowContext}
import edu.uci.ics.amber.engine.architecture.controller._
import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.EmptyRequest
import
edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED
import edu.uci.ics.amber.engine.common.AmberRuntime
import edu.uci.ics.amber.engine.common.client.AmberClient
-import edu.uci.ics.amber.engine.e2e.TestUtils.buildWorkflow
+import edu.uci.ics.amber.engine.e2e.TestUtils.{
+ buildWorkflow,
+ cleanupWorkflowExecutionData,
+ initiateTexeraDBForTestCases,
+ setUpWorkflowExecutionData
+}
import edu.uci.ics.amber.operator.TestOperators
import edu.uci.ics.amber.operator.aggregate.AggregationFunction
+import
edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId
import edu.uci.ics.texera.workflow.LogicalLink
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
@@ -54,14 +58,22 @@ class DataProcessingSpec
implicit val timeout: Timeout = Timeout(5.seconds)
- var inMemoryMySQLInstance: Option[DB] = None
val workflowContext: WorkflowContext = new WorkflowContext()
+ override protected def beforeEach(): Unit = {
+ setUpWorkflowExecutionData()
+ }
+
+ override protected def afterEach(): Unit = {
+ cleanupWorkflowExecutionData()
+ }
+
override def beforeAll(): Unit = {
system.actorOf(Props[SingleNodeListener](), "cluster-info")
// These test cases access postgres in CI, but occasionally the jdbc
driver cannot be found during CI run.
// Explicitly load the JDBC driver to avoid flaky CI failures.
Class.forName("org.postgresql.Driver")
+ initiateTexeraDBForTestCases()
}
override def afterAll(): Unit = {
@@ -88,30 +100,21 @@ class DataProcessingSpec
if (evt.state == COMPLETED) {
results = workflow.logicalPlan.getTerminalOperatorIds
.filter(terminalOpId => {
- val uri = VFSURIFactory.createResultURI(
- workflowContext.workflowId,
+ val uri = getResultUriByLogicalPortId(
workflowContext.executionId,
- GlobalPortIdentity(
- PhysicalOpIdentity(logicalOpId = terminalOpId, layerName =
"main"),
- PortIdentity()
- )
+ terminalOpId,
+ PortIdentity()
)
- // expecting the first output port only.
- ExecutionResourcesMapping
- .getResourceURIs(workflowContext.executionId)
- .contains(uri)
+ uri.nonEmpty
})
.map(terminalOpId => {
//TODO: remove the delay after fixing the issue of reporting
"completed" status too early.
Thread.sleep(1000)
- val uri = VFSURIFactory.createResultURI(
- workflowContext.workflowId,
+ val uri = getResultUriByLogicalPortId(
workflowContext.executionId,
- GlobalPortIdentity(
- PhysicalOpIdentity(logicalOpId = terminalOpId, layerName =
"main"),
- PortIdentity()
- )
- )
+ terminalOpId,
+ PortIdentity()
+ ).get
terminalOpId -> DocumentFactory
.openDocument(uri)
._1
diff --git
a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala
b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala
index bd4164798d..db28c9f459 100644
--- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala
+++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/PauseSpec.scala
@@ -31,10 +31,15 @@ import
edu.uci.ics.amber.engine.architecture.rpc.controlcommands.EmptyRequest
import
edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED
import edu.uci.ics.amber.engine.common.AmberRuntime
import edu.uci.ics.amber.engine.common.client.AmberClient
+import edu.uci.ics.amber.engine.e2e.TestUtils.{
+ cleanupWorkflowExecutionData,
+ initiateTexeraDBForTestCases,
+ setUpWorkflowExecutionData
+}
import edu.uci.ics.amber.operator.{LogicalOp, TestOperators}
import edu.uci.ics.texera.workflow.LogicalLink
-import org.scalatest.BeforeAndAfterAll
import org.scalatest.flatspec.AnyFlatSpecLike
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import scala.concurrent.duration._
@@ -42,17 +47,27 @@ class PauseSpec
extends TestKit(ActorSystem("PauseSpec", AmberRuntime.akkaConfig))
with ImplicitSender
with AnyFlatSpecLike
- with BeforeAndAfterAll {
+ with BeforeAndAfterAll
+ with BeforeAndAfterEach {
implicit val timeout: Timeout = Timeout(5.seconds)
val logger = Logger("PauseSpecLogger")
+ override protected def beforeEach(): Unit = {
+ setUpWorkflowExecutionData()
+ }
+
+ override protected def afterEach(): Unit = {
+ cleanupWorkflowExecutionData()
+ }
+
override def beforeAll(): Unit = {
system.actorOf(Props[SingleNodeListener](), "cluster-info")
// These test cases access postgres in CI, but occasionally the jdbc
driver cannot be found during CI run.
// Explicitly load the JDBC driver to avoid flaky CI failures.
Class.forName("org.postgresql.Driver")
+ initiateTexeraDBForTestCases()
}
override def afterAll(): Unit = {
diff --git
a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/TestUtils.scala
b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/TestUtils.scala
index b20c74cb28..c80ece82e2 100644
--- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/TestUtils.scala
+++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/e2e/TestUtils.scala
@@ -19,9 +19,24 @@
package edu.uci.ics.amber.engine.e2e
+import edu.uci.ics.amber.config.StorageConfig
import edu.uci.ics.amber.core.workflow.WorkflowContext
import edu.uci.ics.amber.engine.architecture.controller.Workflow
import edu.uci.ics.amber.operator.LogicalOp
+import edu.uci.ics.texera.dao.SqlServer
+import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum
+import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{
+ UserDao,
+ WorkflowDao,
+ WorkflowExecutionsDao,
+ WorkflowVersionDao
+}
+import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{
+ User,
+ WorkflowExecutions,
+ WorkflowVersion,
+ Workflow => WorkflowPojo
+}
import edu.uci.ics.texera.web.model.websocket.request.LogicalPlanPojo
import edu.uci.ics.texera.workflow.{LogicalLink, WorkflowCompiler}
@@ -40,4 +55,78 @@ object TestUtils {
)
}
+ /**
+ * If a test case accesses the user system through singleton resources that
cache the DSLContext (e.g., executes a
+ * workflow, which accesses WorkflowExecutionsResource), we use a separate
texera_db specifically for such test cases.
+ * Note such test cases need to clean up the database at the end of running
each test case.
+ */
+ def initiateTexeraDBForTestCases(): Unit = {
+ SqlServer.initConnection(
+ StorageConfig.jdbcUrlForTestCases,
+ StorageConfig.jdbcUsername,
+ StorageConfig.jdbcPassword
+ )
+ }
+
+ val testUser: User = {
+ val user = new User
+ user.setUid(Integer.valueOf(1))
+ user.setName("test_user")
+ user.setRole(UserRoleEnum.ADMIN)
+ user.setPassword("123")
+ user.setEmail("[email protected]")
+ user
+ }
+
+ val testWorkflowEntry: WorkflowPojo = {
+ val workflow = new WorkflowPojo
+ workflow.setName("test workflow")
+ workflow.setWid(Integer.valueOf(1))
+ workflow.setContent("test workflow content")
+ workflow.setDescription("test description")
+ workflow
+ }
+
+ val testWorkflowVersionEntry: WorkflowVersion = {
+ val workflowVersion = new WorkflowVersion
+ workflowVersion.setWid(Integer.valueOf(1))
+ workflowVersion.setVid(Integer.valueOf(1))
+ workflowVersion.setContent("test version content")
+ workflowVersion
+ }
+
+ val testWorkflowExecutionEntry: WorkflowExecutions = {
+ val workflowExecution = new WorkflowExecutions
+ workflowExecution.setEid(Integer.valueOf(1))
+ workflowExecution.setVid(Integer.valueOf(1))
+ workflowExecution.setUid(Integer.valueOf(1))
+ workflowExecution.setStatus(3.toByte)
+ workflowExecution.setEnvironmentVersion("test engine")
+ workflowExecution
+ }
+
+ def setUpWorkflowExecutionData(): Unit = {
+ val dslConfig = SqlServer.getInstance().context.configuration()
+ val userDao = new UserDao(dslConfig)
+ val workflowDao = new WorkflowDao(dslConfig)
+ val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig)
+ val workflowVersionDao = new WorkflowVersionDao(dslConfig)
+ userDao.insert(testUser)
+ workflowDao.insert(testWorkflowEntry)
+ workflowVersionDao.insert(testWorkflowVersionEntry)
+ workflowExecutionsDao.insert(testWorkflowExecutionEntry)
+ }
+
+ def cleanupWorkflowExecutionData(): Unit = {
+ val dslConfig = SqlServer.getInstance().context.configuration()
+ val userDao = new UserDao(dslConfig)
+ val workflowDao = new WorkflowDao(dslConfig)
+ val workflowExecutionsDao = new WorkflowExecutionsDao(dslConfig)
+ val workflowVersionDao = new WorkflowVersionDao(dslConfig)
+ workflowExecutionsDao.deleteById(1)
+ workflowVersionDao.deleteById(1)
+ workflowDao.deleteById(1)
+ userDao.deleteById(1)
+ }
+
}
diff --git a/core/config/src/main/resources/storage.conf
b/core/config/src/main/resources/storage.conf
index 7aba655b46..85a62b77a3 100644
--- a/core/config/src/main/resources/storage.conf
+++ b/core/config/src/main/resources/storage.conf
@@ -125,6 +125,12 @@ storage {
url =
"jdbc:postgresql://localhost:5432/texera_db?currentSchema=texera_db,public"
url = ${?STORAGE_JDBC_URL}
+ # Some e2e test cases require the user system. To make sure running
those test cases can pass the CI, and that
+ # running them locally do not contaminate developers' own texera_db,
we use another database with a different
+ # name for running these test cases.
+ url-for-test-cases =
"jdbc:postgresql://localhost:5432/texera_db_for_test_cases?currentSchema=texera_db,public"
+ url-for-test-cases = ${?STORAGE_JDBC_URL_FOR_TEST_CASES}
+
username = "postgres"
username = ${?STORAGE_JDBC_USERNAME}
diff --git a/core/config/src/main/resources/user-system.conf
b/core/config/src/main/resources/user-system.conf
index aa7d65a333..72f5e23d23 100644
--- a/core/config/src/main/resources/user-system.conf
+++ b/core/config/src/main/resources/user-system.conf
@@ -17,7 +17,7 @@
# See PR https://github.com/Texera/texera/pull/3326 for configuration
guidelines.
user-sys {
- enabled = false
+ enabled = true
enabled = ${?USER_SYS_ENABLED}
admin-username = "texera"
diff --git
a/core/config/src/main/scala/edu/uci/ics/amber/config/StorageConfig.scala
b/core/config/src/main/scala/edu/uci/ics/amber/config/StorageConfig.scala
index fd369fbb5b..92a8e15d3f 100644
--- a/core/config/src/main/scala/edu/uci/ics/amber/config/StorageConfig.scala
+++ b/core/config/src/main/scala/edu/uci/ics/amber/config/StorageConfig.scala
@@ -34,6 +34,7 @@ object StorageConfig {
// JDBC specifics
val jdbcUrl: String = conf.getString("storage.jdbc.url")
+ val jdbcUrlForTestCases: String =
conf.getString("storage.jdbc.url-for-test-cases")
val jdbcUsername: String = conf.getString("storage.jdbc.username")
val jdbcPassword: String = conf.getString("storage.jdbc.password")
diff --git a/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala
b/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala
index 18eb92b9f3..dd30bc0bda 100644
--- a/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala
+++ b/core/dao/src/main/scala/edu/uci/ics/texera/dao/SqlServer.scala
@@ -62,6 +62,10 @@ object SqlServer {
instance.get
}
+ def clearInstance(): Unit = {
+ instance = None
+ }
+
/**
* A utility function for create a transaction block using given sql context
* @param dsl the sql context
diff --git a/core/dao/src/test/scala/edu/uci/ics/texera/dao/MockTexeraDB.scala
b/core/dao/src/test/scala/edu/uci/ics/texera/dao/MockTexeraDB.scala
index 1bb4951816..cc291f9492 100644
--- a/core/dao/src/test/scala/edu/uci/ics/texera/dao/MockTexeraDB.scala
+++ b/core/dao/src/test/scala/edu/uci/ics/texera/dao/MockTexeraDB.scala
@@ -67,6 +67,7 @@ trait MockTexeraDB {
value.close()
dbInstance = None
dslContext = None
+ SqlServer.clearInstance()
case None =>
// do nothing
}
@@ -92,12 +93,15 @@ trait MockTexeraDB {
} finally {
source.close()
}
- val parts: Array[String] = content.split("(?m)^\\\\c texera_db")
+ val parts: Array[String] = content.split("(?m)^CREATE DATABASE
:\"DB_NAME\";")
def removeCCommands(sql: String): String =
sql.linesIterator
.filterNot(_.trim.startsWith("\\c"))
.mkString("\n")
- executeScriptInJDBC(embedded.getPostgresDatabase.getConnection,
removeCCommands(parts(0)))
+ val createDBStatement =
+ """DROP DATABASE IF EXISTS texera_db;
+ |CREATE DATABASE texera_db;""".stripMargin
+ executeScriptInJDBC(embedded.getPostgresDatabase.getConnection,
createDBStatement)
val texeraDB = embedded.getDatabase(username, database)
var tablesAndIndexCreation = removeCCommands(parts(1))
diff --git a/core/scripts/sql/texera_ddl.sql b/core/scripts/sql/texera_ddl.sql
index b9cac6f545..7b0f9b9063 100644
--- a/core/scripts/sql/texera_ddl.sql
+++ b/core/scripts/sql/texera_ddl.sql
@@ -15,18 +15,29 @@
-- specific language governing permissions and limitations
-- under the License.
+-- ============================================
+-- 0. Specify the database name
+-- (defaults to texera_db)
+-- Override the name with:
+-- psql -v DB_NAME=<alternative_name> ...
+-- ============================================
+\if :{?DB_NAME}
+\else
+ \set DB_NAME 'texera_db'
+\endif
+
-- ============================================
-- 1. Drop and recreate the database (psql only)
-- Remove if you already created texera_db
-- ============================================
\c postgres
-DROP DATABASE IF EXISTS texera_db;
-CREATE DATABASE texera_db;
+DROP DATABASE IF EXISTS :"DB_NAME";
+CREATE DATABASE :"DB_NAME";
-- ============================================
-- 2. Connect to the new database (psql only)
-- ============================================
-\c texera_db
+\c :"DB_NAME"
CREATE SCHEMA IF NOT EXISTS texera_db;
SET search_path TO texera_db, public;