This is an automated email from the ASF dual-hosted git repository.

xiaozhenliu pushed a commit to branch xiaozhen-remove-non-user-mode
in repository https://gitbox.apache.org/repos/asf/texera.git

commit d3b7e9b9dc1984fab0c01f0097a305c4b7b21b6c
Author: Xiao-zhen-Liu <[email protected]>
AuthorDate: Mon Oct 6 11:24:48 2025 -0700

    refactor(config): remove user-sys enabled flag
---
 .../uci/ics/texera/web/ComputingUnitMaster.scala   |  44 +++--
 .../uci/ics/texera/web/TexeraWebApplication.scala  |   4 +-
 .../edu/uci/ics/texera/web/auth/JwtAuth.scala      |  31 ++--
 .../texera/web/resource/auth/AuthResource.scala    |   4 -
 .../web/resource/auth/GoogleAuthResource.scala     |   2 -
 .../user/workflow/WorkflowExecutionsResource.scala | 187 +++++++++------------
 .../web/service/ExecutionConsoleService.scala      |  31 ++--
 .../texera/web/service/ExecutionStatsService.scala | 122 ++++++--------
 .../service/ExecutionsMetadataPersistService.scala |   3 -
 .../web/service/WorkflowExecutionService.scala     |   3 -
 .../ics/texera/web/service/WorkflowService.scala   |  49 +++---
 .../texera/service/resource/ConfigResource.scala   |   3 +-
 core/config/src/main/resources/user-system.conf    |   3 -
 .../uci/ics/texera/config/UserSystemConfig.scala   |   1 -
 .../storage/result/ExecutionResourcesMapping.scala |  75 ---------
 15 files changed, 204 insertions(+), 358 deletions(-)

diff --git 
a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala 
b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala
index 7760267311..6de230ddd5 100644
--- a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala
+++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala
@@ -156,30 +156,28 @@ class ComputingUnitMaster extends 
io.dropwizard.Application[Configuration] with
         new 
WebsocketPayloadSizeTuner(ApplicationConfig.maxWorkflowWebsocketRequestPayloadSizeKb)
       )
 
-    if (UserSystemConfig.isUserSystemEnabled) {
-      val timeToLive: Int = ApplicationConfig.sinkStorageTTLInSecs
-      if (ApplicationConfig.cleanupAllExecutionResults) {
-        // do one time cleanup of collections that were not closed gracefully 
before restart/crash
-        // retrieve all executions that were executing before the reboot.
-        val allExecutionsBeforeRestart: List[WorkflowExecutions] =
-          WorkflowExecutionsResource.getExpiredExecutionsWithResultOrLog(-1)
-        cleanExecutions(
-          allExecutionsBeforeRestart,
-          statusByte => {
-            if (statusByte != maptoStatusCode(COMPLETED)) {
-              maptoStatusCode(FAILED) // for incomplete executions, mark them 
as failed.
-            } else {
-              statusByte
-            }
+    val timeToLive: Int = ApplicationConfig.sinkStorageTTLInSecs
+    if (ApplicationConfig.cleanupAllExecutionResults) {
+      // do one time cleanup of collections that were not closed gracefully 
before restart/crash
+      // retrieve all executions that were executing before the reboot.
+      val allExecutionsBeforeRestart: List[WorkflowExecutions] =
+        WorkflowExecutionsResource.getExpiredExecutionsWithResultOrLog(-1)
+      cleanExecutions(
+        allExecutionsBeforeRestart,
+        statusByte => {
+          if (statusByte != maptoStatusCode(COMPLETED)) {
+            maptoStatusCode(FAILED) // for incomplete executions, mark them as 
failed.
+          } else {
+            statusByte
           }
-        )
-      }
-      scheduleRecurringCallThroughActorSystem(
-        2.seconds,
-        ApplicationConfig.sinkStorageCleanUpCheckIntervalInSecs.seconds
-      ) {
-        recurringCheckExpiredResults(timeToLive)
-      }
+        }
+      )
+    }
+    scheduleRecurringCallThroughActorSystem(
+      2.seconds,
+      ApplicationConfig.sinkStorageCleanUpCheckIntervalInSecs.seconds
+    ) {
+      recurringCheckExpiredResults(timeToLive)
     }
 
     environment.jersey.register(classOf[WorkflowExecutionsResource])
diff --git 
a/core/amber/src/main/scala/edu/uci/ics/texera/web/TexeraWebApplication.scala 
b/core/amber/src/main/scala/edu/uci/ics/texera/web/TexeraWebApplication.scala
index a74b40c671..577c4aff4f 100644
--- 
a/core/amber/src/main/scala/edu/uci/ics/texera/web/TexeraWebApplication.scala
+++ 
b/core/amber/src/main/scala/edu/uci/ics/texera/web/TexeraWebApplication.scala
@@ -152,8 +152,6 @@ class TexeraWebApplication
     environment.jersey.register(classOf[AdminSettingsResource])
     environment.jersey.register(classOf[AIAssistantResource])
 
-    if (UserSystemConfig.isUserSystemEnabled) {
-      AuthResource.createAdminUser()
-    }
+    AuthResource.createAdminUser()
   }
 }
diff --git 
a/core/amber/src/main/scala/edu/uci/ics/texera/web/auth/JwtAuth.scala 
b/core/amber/src/main/scala/edu/uci/ics/texera/web/auth/JwtAuth.scala
index 0847102929..3a4488c3fb 100644
--- a/core/amber/src/main/scala/edu/uci/ics/texera/web/auth/JwtAuth.scala
+++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/auth/JwtAuth.scala
@@ -30,26 +30,17 @@ import io.dropwizard.setup.Environment
 @Deprecated
 object JwtAuth {
   def setupJwtAuth(environment: Environment): Unit = {
-    if (UserSystemConfig.isUserSystemEnabled) {
-      // register JWT Auth layer
-      environment.jersey.register(
-        new AuthDynamicFeature(
-          new JwtAuthFilter.Builder[SessionUser]()
-            .setJwtConsumer(jwtConsumer)
-            .setRealm("realm")
-            .setPrefix("Bearer")
-            .setAuthenticator(UserAuthenticator)
-            .setAuthorizer(UserRoleAuthorizer)
-            .buildAuthFilter()
-        )
+    // register JWT Auth layer
+    environment.jersey.register(
+      new AuthDynamicFeature(
+        new JwtAuthFilter.Builder[SessionUser]()
+          .setJwtConsumer(jwtConsumer)
+          .setRealm("realm")
+          .setPrefix("Bearer")
+          .setAuthenticator(UserAuthenticator)
+          .setAuthorizer(UserRoleAuthorizer)
+          .buildAuthFilter()
       )
-    } else {
-      // register Guest Auth layer
-      environment.jersey.register(
-        new AuthDynamicFeature(
-          new 
GuestAuthFilter.Builder().setAuthorizer(UserRoleAuthorizer).buildAuthFilter()
-        )
-      )
-    }
+    )
   }
 }
diff --git 
a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/auth/AuthResource.scala
 
b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/auth/AuthResource.scala
index ccccdd768a..23e0bb04e8 100644
--- 
a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/auth/AuthResource.scala
+++ 
b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/auth/AuthResource.scala
@@ -89,8 +89,6 @@ class AuthResource {
   @POST
   @Path("/login")
   def login(request: UserLoginRequest): TokenIssueResponse = {
-    if (!UserSystemConfig.isUserSystemEnabled)
-      throw new NotAcceptableException("User System is disabled on the 
backend!")
     retrieveUserByUsernameAndPassword(request.username, request.password) 
match {
       case Some(user) =>
         TokenIssueResponse(jwtToken(jwtClaims(user, 
TOKEN_EXPIRE_TIME_IN_MINUTES)))
@@ -101,8 +99,6 @@ class AuthResource {
   @POST
   @Path("/register")
   def register(request: UserRegistrationRequest): TokenIssueResponse = {
-    if (!UserSystemConfig.isUserSystemEnabled)
-      throw new NotAcceptableException("User System is disabled on the 
backend!")
     val username = request.username
     if (username == null) throw new NotAcceptableException("Username cannot be 
null.")
     if (username.trim.isEmpty) throw new NotAcceptableException("Username 
cannot be empty.")
diff --git 
a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/auth/GoogleAuthResource.scala
 
b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/auth/GoogleAuthResource.scala
index d49d4d41a5..3bdc0f02eb 100644
--- 
a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/auth/GoogleAuthResource.scala
+++ 
b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/auth/GoogleAuthResource.scala
@@ -57,8 +57,6 @@ class GoogleAuthResource {
   @Produces(Array(MediaType.APPLICATION_JSON))
   @Path("/login")
   def login(credential: String): TokenIssueResponse = {
-    if (!UserSystemConfig.isUserSystemEnabled)
-      throw new NotAcceptableException("User System is disabled on the 
backend!")
     val idToken =
       new GoogleIdTokenVerifier.Builder(new NetHttpTransport, 
GsonFactory.getDefaultInstance)
         .setAudience(
diff --git 
a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
 
b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
index c3b63a2df8..24433b76cd 100644
--- 
a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
+++ 
b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala
@@ -19,7 +19,6 @@
 
 package edu.uci.ics.texera.web.resource.dashboard.user.workflow
 
-import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping
 import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSResourceType, 
VFSURIFactory}
 import edu.uci.ics.amber.core.tuple.Tuple
 import edu.uci.ics.amber.core.virtualidentity._
@@ -28,13 +27,12 @@ import 
edu.uci.ics.amber.engine.architecture.logreplay.{ReplayDestination, Repla
 import edu.uci.ics.amber.engine.common.Utils.{maptoStatusCode, 
stringToAggregatedState}
 import edu.uci.ics.amber.engine.common.storage.SequentialRecordStorage
 import edu.uci.ics.amber.util.serde.GlobalPortIdentitySerde.SerdeOps
+import edu.uci.ics.texera.auth.SessionUser
 import edu.uci.ics.texera.dao.SqlServer
+import edu.uci.ics.texera.dao.SqlServer.withTransaction
 import edu.uci.ics.texera.dao.jooq.generated.Tables._
 import edu.uci.ics.texera.dao.jooq.generated.tables.daos.WorkflowExecutionsDao
 import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.WorkflowExecutions
-import edu.uci.ics.texera.auth.SessionUser
-import edu.uci.ics.texera.config.UserSystemConfig
-import edu.uci.ics.texera.dao.SqlServer.withTransaction
 import edu.uci.ics.texera.web.model.http.request.result.ResultExportRequest
 import 
edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource._
 import edu.uci.ics.texera.web.service.{ExecutionsMetadataPersistService, 
ResultExportService}
@@ -107,19 +105,15 @@ object WorkflowExecutionsResource {
       globalPortId: GlobalPortIdentity,
       uri: URI
   ): Unit = {
-    if (UserSystemConfig.isUserSystemEnabled) {
-      context
-        .insertInto(OPERATOR_PORT_EXECUTIONS)
-        .columns(
-          OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID,
-          OPERATOR_PORT_EXECUTIONS.GLOBAL_PORT_ID,
-          OPERATOR_PORT_EXECUTIONS.RESULT_URI
-        )
-        .values(eid.id.toInt, globalPortId.serializeAsString, uri.toString)
-        .execute()
-    } else {
-      ExecutionResourcesMapping.addResourceUri(eid, uri)
-    }
+    context
+      .insertInto(OPERATOR_PORT_EXECUTIONS)
+      .columns(
+        OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID,
+        OPERATOR_PORT_EXECUTIONS.GLOBAL_PORT_ID,
+        OPERATOR_PORT_EXECUTIONS.RESULT_URI
+      )
+      .values(eid.id.toInt, globalPortId.serializeAsString, uri.toString)
+      .execute()
   }
 
   def insertOperatorExecutions(
@@ -158,45 +152,37 @@ object WorkflowExecutionsResource {
   }
 
   def getResultUrisByExecutionId(eid: ExecutionIdentity): List[URI] = {
-    if (UserSystemConfig.isUserSystemEnabled) {
-      context
-        .select(OPERATOR_PORT_EXECUTIONS.RESULT_URI)
-        .from(OPERATOR_PORT_EXECUTIONS)
-        .where(OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt))
-        .fetchInto(classOf[String])
-        .asScala
-        .toList
-        .filter(uri => uri != null && uri.nonEmpty)
-        .map(URI.create)
-    } else {
-      ExecutionResourcesMapping.getResourceURIs(eid)
-    }
+    context
+      .select(OPERATOR_PORT_EXECUTIONS.RESULT_URI)
+      .from(OPERATOR_PORT_EXECUTIONS)
+      .where(OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt))
+      .fetchInto(classOf[String])
+      .asScala
+      .toList
+      .filter(uri => uri != null && uri.nonEmpty)
+      .map(URI.create)
   }
 
   def getConsoleMessagesUriByExecutionId(eid: ExecutionIdentity): List[URI] =
-    if (UserSystemConfig.isUserSystemEnabled)
-      context
-        .select(OPERATOR_EXECUTIONS.CONSOLE_MESSAGES_URI)
-        .from(OPERATOR_EXECUTIONS)
-        .where(OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt))
-        .fetchInto(classOf[String])
-        .asScala
-        .toList
-        .filter(uri => uri != null && uri.nonEmpty)
-        .map(URI.create)
-    else Nil
+    context
+      .select(OPERATOR_EXECUTIONS.CONSOLE_MESSAGES_URI)
+      .from(OPERATOR_EXECUTIONS)
+      .where(OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt))
+      .fetchInto(classOf[String])
+      .asScala
+      .toList
+      .filter(uri => uri != null && uri.nonEmpty)
+      .map(URI.create)
 
   def getRuntimeStatsUriByExecutionId(eid: ExecutionIdentity): Option[URI] =
-    if (UserSystemConfig.isUserSystemEnabled)
-      Option(
-        context
-          .select(WORKFLOW_EXECUTIONS.RUNTIME_STATS_URI)
-          .from(WORKFLOW_EXECUTIONS)
-          .where(WORKFLOW_EXECUTIONS.EID.eq(eid.id.toInt))
-          .fetchOneInto(classOf[String])
-      ).filter(_.nonEmpty)
-        .map(URI.create)
-    else None
+    Option(
+      context
+        .select(WORKFLOW_EXECUTIONS.RUNTIME_STATS_URI)
+        .from(WORKFLOW_EXECUTIONS)
+        .where(WORKFLOW_EXECUTIONS.EID.eq(eid.id.toInt))
+        .fetchOneInto(classOf[String])
+    ).filter(_.nonEmpty)
+      .map(URI.create)
 
   def getWorkflowExecutions(
       wid: Integer,
@@ -239,18 +225,14 @@ object WorkflowExecutionsResource {
   }
 
   def deleteConsoleMessageAndExecutionResultUris(eid: ExecutionIdentity): Unit 
= {
-    if (UserSystemConfig.isUserSystemEnabled) {
-      context
-        .delete(OPERATOR_PORT_EXECUTIONS)
-        .where(OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt))
-        .execute()
-      context
-        .delete(OPERATOR_EXECUTIONS)
-        .where(OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt))
-        .execute()
-    } else {
-      ExecutionResourcesMapping.removeExecutionResources(eid)
-    }
+    context
+      .delete(OPERATOR_PORT_EXECUTIONS)
+      .where(OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt))
+      .execute()
+    context
+      .delete(OPERATOR_EXECUTIONS)
+      .where(OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt))
+      .execute()
   }
 
   /**
@@ -316,22 +298,20 @@ object WorkflowExecutionsResource {
     * @param eid Execution ID associated with the runtime statistics document.
     */
   def updateRuntimeStatsSize(eid: ExecutionIdentity): Unit = {
-    if (UserSystemConfig.isUserSystemEnabled) {
-      val statsUriOpt = context
-        .select(WORKFLOW_EXECUTIONS.RUNTIME_STATS_URI)
-        .from(WORKFLOW_EXECUTIONS)
-        .where(WORKFLOW_EXECUTIONS.EID.eq(eid.id.toInt))
-        .fetchOptionalInto(classOf[String])
-        .map(URI.create)
+    val statsUriOpt = context
+      .select(WORKFLOW_EXECUTIONS.RUNTIME_STATS_URI)
+      .from(WORKFLOW_EXECUTIONS)
+      .where(WORKFLOW_EXECUTIONS.EID.eq(eid.id.toInt))
+      .fetchOptionalInto(classOf[String])
+      .map(URI.create)
 
-      if (statsUriOpt.isPresent) {
-        val size = 
DocumentFactory.openDocument(statsUriOpt.get)._1.getTotalFileSize
-        context
-          .update(WORKFLOW_EXECUTIONS)
-          .set(WORKFLOW_EXECUTIONS.RUNTIME_STATS_SIZE, 
Integer.valueOf(size.toInt))
-          .where(WORKFLOW_EXECUTIONS.EID.eq(eid.id.toInt))
-          .execute()
-      }
+    if (statsUriOpt.isPresent) {
+      val size = 
DocumentFactory.openDocument(statsUriOpt.get)._1.getTotalFileSize
+      context
+        .update(WORKFLOW_EXECUTIONS)
+        .set(WORKFLOW_EXECUTIONS.RUNTIME_STATS_SIZE, 
Integer.valueOf(size.toInt))
+        .where(WORKFLOW_EXECUTIONS.EID.eq(eid.id.toInt))
+        .execute()
     }
   }
 
@@ -342,24 +322,22 @@ object WorkflowExecutionsResource {
     * @param opId Operator ID of the corresponding operator.
     */
   def updateConsoleMessageSize(eid: ExecutionIdentity, opId: 
OperatorIdentity): Unit = {
-    if (UserSystemConfig.isUserSystemEnabled) {
-      val uriOpt = context
-        .select(OPERATOR_EXECUTIONS.CONSOLE_MESSAGES_URI)
-        .from(OPERATOR_EXECUTIONS)
+    val uriOpt = context
+      .select(OPERATOR_EXECUTIONS.CONSOLE_MESSAGES_URI)
+      .from(OPERATOR_EXECUTIONS)
+      .where(OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt))
+      .and(OPERATOR_EXECUTIONS.OPERATOR_ID.eq(opId.id))
+      .fetchOptionalInto(classOf[String])
+      .map(URI.create)
+
+    if (uriOpt.isPresent) {
+      val size = DocumentFactory.openDocument(uriOpt.get)._1.getTotalFileSize
+      context
+        .update(OPERATOR_EXECUTIONS)
+        .set(OPERATOR_EXECUTIONS.CONSOLE_MESSAGES_SIZE, 
Integer.valueOf(size.toInt))
         .where(OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt))
         .and(OPERATOR_EXECUTIONS.OPERATOR_ID.eq(opId.id))
-        .fetchOptionalInto(classOf[String])
-        .map(URI.create)
-
-      if (uriOpt.isPresent) {
-        val size = DocumentFactory.openDocument(uriOpt.get)._1.getTotalFileSize
-        context
-          .update(OPERATOR_EXECUTIONS)
-          .set(OPERATOR_EXECUTIONS.CONSOLE_MESSAGES_SIZE, 
Integer.valueOf(size.toInt))
-          .where(OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt))
-          .and(OPERATOR_EXECUTIONS.OPERATOR_ID.eq(opId.id))
-          .execute()
-      }
+        .execute()
     }
   }
 
@@ -368,7 +346,6 @@ object WorkflowExecutionsResource {
     * this method finds the URI for a globalPortId that both: 1. matches the 
logicalOpId and outputPortId, and
     * 2. is an external port. Currently the lookup is O(n), where n is the 
number of globalPortIds for this execution.
     * TODO: Optimize the lookup once the frontend also has information about 
physical operators.
-    * TODO: Remove the case of using ExecutionResourceMapping when user system 
is permenantly enabled even in dev mode.
     */
   def getResultUriByLogicalPortId(
       eid: ExecutionIdentity,
@@ -386,18 +363,14 @@ object WorkflowExecutionsResource {
     }
 
     val urisOfEid: List[URI] =
-      if (UserSystemConfig.isUserSystemEnabled) {
-        context
-          .select(OPERATOR_PORT_EXECUTIONS.RESULT_URI)
-          .from(OPERATOR_PORT_EXECUTIONS)
-          
.where(OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt))
-          .fetchInto(classOf[String])
-          .asScala
-          .toList
-          .map(URI.create)
-      } else {
-        ExecutionResourcesMapping.getResourceURIs(eid)
-      }
+      context
+        .select(OPERATOR_PORT_EXECUTIONS.RESULT_URI)
+        .from(OPERATOR_PORT_EXECUTIONS)
+        .where(OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt))
+        .fetchInto(classOf[String])
+        .asScala
+        .toList
+        .map(URI.create)
 
     urisOfEid.find(isMatchingExternalPortURI)
   }
diff --git 
a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionConsoleService.scala
 
b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionConsoleService.scala
index f69a1c81d3..929689fcf7 100644
--- 
a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionConsoleService.scala
+++ 
b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionConsoleService.scala
@@ -139,8 +139,7 @@ class ExecutionConsoleService(
   private val consoleMessageOpIdToWriterMap: mutable.Map[String, 
BufferedItemWriter[Tuple]] =
     mutable.Map()
 
-  private val consoleWriterThread: Option[ExecutorService] =
-    
Option.when(UserSystemConfig.isUserSystemEnabled)(Executors.newSingleThreadExecutor())
+  private val consoleWriterThread: ExecutorService = 
Executors.newSingleThreadExecutor()
 
   private def getOrCreateWriter(opId: OperatorIdentity): 
BufferedItemWriter[Tuple] = {
     consoleMessageOpIdToWriterMap.getOrElseUpdate(
@@ -261,21 +260,19 @@ class ExecutionConsoleService(
       consoleMessage: ConsoleMessage
   ): ExecutionConsoleStore = {
     // Write the original full message to the database
-    consoleWriterThread.foreach { thread =>
-      thread.execute(() => {
-        val writer = getOrCreateWriter(OperatorIdentity(opId))
-        try {
-          val tuple = new Tuple(
-            ResultSchema.consoleMessagesSchema,
-            Array(consoleMessage.toProtoString)
-          )
-          writer.putOne(tuple)
-        } catch {
-          case e: Exception =>
-            logger.error(s"Error while writing console message for operator 
$opId", e)
-        }
-      })
-    }
+    consoleWriterThread.execute(() => {
+      val writer = getOrCreateWriter(OperatorIdentity(opId))
+      try {
+        val tuple = new Tuple(
+          ResultSchema.consoleMessagesSchema,
+          Array(consoleMessage.toProtoString)
+        )
+        writer.putOne(tuple)
+      } catch {
+        case e: Exception =>
+          logger.error(s"Error while writing console message for operator 
$opId", e)
+      }
+    })
 
     // Process the message (truncate if needed) and update store
     val truncatedMessage = processConsoleMessage(consoleMessage)
diff --git 
a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionStatsService.scala
 
b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionStatsService.scala
index 6aabe7c4eb..9235fdaabf 100644
--- 
a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionStatsService.scala
+++ 
b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionStatsService.scala
@@ -72,30 +72,26 @@ class ExecutionStatsService(
 ) extends SubscriptionManager
     with LazyLogging {
   private val (metricsPersistThread, runtimeStatsWriter) = {
-    if (UserSystemConfig.isUserSystemEnabled) {
-      val thread = Executors.newSingleThreadExecutor()
-      val uri = VFSURIFactory.createRuntimeStatisticsURI(
-        workflowContext.workflowId,
-        workflowContext.executionId
-      )
-      val writer = DocumentFactory
-        .createDocument(uri, ResultSchema.runtimeStatisticsSchema)
-        .writer("runtime_statistics")
-        .asInstanceOf[BufferedItemWriter[Tuple]]
-      WorkflowExecutionsResource.updateRuntimeStatsUri(
-        workflowContext.workflowId.id,
-        workflowContext.executionId.id,
-        uri
-      )
-      writer.open()
-      (Some(thread), Some(writer))
-    } else {
-      (None, None)
-    }
+    val thread = Executors.newSingleThreadExecutor()
+    val uri = VFSURIFactory.createRuntimeStatisticsURI(
+      workflowContext.workflowId,
+      workflowContext.executionId
+    )
+    val writer = DocumentFactory
+      .createDocument(uri, ResultSchema.runtimeStatisticsSchema)
+      .writer("runtime_statistics")
+      .asInstanceOf[BufferedItemWriter[Tuple]]
+    WorkflowExecutionsResource.updateRuntimeStatsUri(
+      workflowContext.workflowId.id,
+      workflowContext.executionId.id,
+      uri
+    )
+    writer.open()
+    (thread, writer)
   }
 
-  private var lastPersistedMetrics: Option[Map[String, OperatorMetrics]] =
-    Option.when(UserSystemConfig.isUserSystemEnabled)(Map.empty[String, 
OperatorMetrics])
+  private var lastPersistedMetrics: Map[String, OperatorMetrics] =
+    Map.empty[String, OperatorMetrics]
 
   registerCallbacks()
 
@@ -189,12 +185,10 @@ class ExecutionStatsService(
           stateStore.statsStore.updateState { statsStore =>
             statsStore.withOperatorInfo(evt.operatorMetrics)
           }
-          metricsPersistThread.foreach { thread =>
-            thread.execute(() => {
-              storeRuntimeStatistics(computeStatsDiff(evt.operatorMetrics))
-              lastPersistedMetrics = Some(evt.operatorMetrics)
-            })
-          }
+          metricsPersistThread.execute(() => {
+            storeRuntimeStatistics(computeStatsDiff(evt.operatorMetrics))
+            lastPersistedMetrics = evt.operatorMetrics
+          })
         })
     )
   }
@@ -204,13 +198,11 @@ class ExecutionStatsService(
       case ExecutionStateUpdate(state: WorkflowAggregatedState.Recognized)
           if Set(COMPLETED, FAILED, KILLED).contains(state) =>
         logger.info("Workflow execution terminated. Commit runtime 
statistics.")
-        runtimeStatsWriter.foreach { writer =>
-          try {
-            writer.close()
-          } catch {
-            case e: Exception =>
-              logger.error("Failed to close runtime statistics writer", e)
-          }
+        try {
+          runtimeStatsWriter.close()
+        } catch {
+          case e: Exception =>
+            logger.error("Failed to close runtime statistics writer", e)
         }
       case _ =>
     }
@@ -225,15 +217,12 @@ class ExecutionStatsService(
       OperatorStatistics(Seq.empty, Seq.empty, 0, 0, 0, 0)
     )
 
-    // Retrieve the last persisted metrics or default to an empty map
-    val lastMetrics = lastPersistedMetrics.getOrElse(Map.empty)
-
     // Determine new and old keys
-    val newKeys = newMetrics.keySet.diff(lastMetrics.keySet)
-    val oldKeys = lastMetrics.keySet.diff(newMetrics.keySet)
+    val newKeys = newMetrics.keySet.diff(lastPersistedMetrics.keySet)
+    val oldKeys = lastPersistedMetrics.keySet.diff(newMetrics.keySet)
 
     // Update last metrics with default metrics for new keys
-    val updatedLastMetrics = lastMetrics ++ newKeys.map(_ -> defaultMetrics)
+    val updatedLastMetrics = lastPersistedMetrics ++ newKeys.map(_ -> 
defaultMetrics)
 
     // Combine new metrics with old metrics for keys that are no longer present
     val completeMetricsMap = newMetrics ++ oldKeys.map(key => key -> 
updatedLastMetrics(key))
@@ -258,34 +247,29 @@ class ExecutionStatsService(
   private def storeRuntimeStatistics(
       operatorStatistics: scala.collection.immutable.Map[String, 
OperatorMetrics]
   ): Unit = {
-    runtimeStatsWriter match {
-      case Some(writer) =>
-        try {
-          operatorStatistics.foreach {
-            case (operatorId, stat) =>
-              val runtimeStats = new Tuple(
-                ResultSchema.runtimeStatisticsSchema,
-                Array(
-                  operatorId,
-                  new java.sql.Timestamp(System.currentTimeMillis()),
-                  
stat.operatorStatistics.inputMetrics.map(_.tupleMetrics.count).sum,
-                  
stat.operatorStatistics.inputMetrics.map(_.tupleMetrics.size).sum,
-                  
stat.operatorStatistics.outputMetrics.map(_.tupleMetrics.count).sum,
-                  
stat.operatorStatistics.outputMetrics.map(_.tupleMetrics.size).sum,
-                  stat.operatorStatistics.dataProcessingTime,
-                  stat.operatorStatistics.controlProcessingTime,
-                  stat.operatorStatistics.idleTime,
-                  stat.operatorStatistics.numWorkers,
-                  maptoStatusCode(stat.operatorState).toInt
-                )
-              )
-              writer.putOne(runtimeStats)
-          }
-        } catch {
-          case err: Throwable => logger.error("error occurred when storing 
runtime statistics", err)
-        }
-      case None =>
-        logger.warn("Runtime statistics writer is not available.")
+    try {
+      operatorStatistics.foreach {
+        case (operatorId, stat) =>
+          val runtimeStats = new Tuple(
+            ResultSchema.runtimeStatisticsSchema,
+            Array(
+              operatorId,
+              new java.sql.Timestamp(System.currentTimeMillis()),
+              
stat.operatorStatistics.inputMetrics.map(_.tupleMetrics.count).sum,
+              
stat.operatorStatistics.inputMetrics.map(_.tupleMetrics.size).sum,
+              
stat.operatorStatistics.outputMetrics.map(_.tupleMetrics.count).sum,
+              
stat.operatorStatistics.outputMetrics.map(_.tupleMetrics.size).sum,
+              stat.operatorStatistics.dataProcessingTime,
+              stat.operatorStatistics.controlProcessingTime,
+              stat.operatorStatistics.idleTime,
+              stat.operatorStatistics.numWorkers,
+              maptoStatusCode(stat.operatorState).toInt
+            )
+          )
+          runtimeStatsWriter.putOne(runtimeStats)
+      }
+    } catch {
+      case err: Throwable => logger.error("error occurred when storing runtime 
statistics", err)
     }
   }
 
diff --git 
a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionsMetadataPersistService.scala
 
b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionsMetadataPersistService.scala
index c1f49a8470..16f7e3cd7a 100644
--- 
a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionsMetadataPersistService.scala
+++ 
b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionsMetadataPersistService.scala
@@ -57,7 +57,6 @@ object ExecutionsMetadataPersistService extends LazyLogging {
       environmentVersion: String,
       computingUnitId: Integer
   ): ExecutionIdentity = {
-    if (!UserSystemConfig.isUserSystemEnabled) return DEFAULT_EXECUTION_ID
     // first retrieve the latest version of this workflow
     val vid = getLatestVersion(workflowId.id.toInt)
     val newExecution = new WorkflowExecutions()
@@ -77,7 +76,6 @@ object ExecutionsMetadataPersistService extends LazyLogging {
   }
 
   def tryGetExistingExecution(executionId: ExecutionIdentity): 
Option[WorkflowExecutions] = {
-    if (!UserSystemConfig.isUserSystemEnabled) return None
     try {
       Some(workflowExecutionsDao.fetchOneByEid(executionId.id.toInt))
     } catch {
@@ -90,7 +88,6 @@ object ExecutionsMetadataPersistService extends LazyLogging {
   def tryUpdateExistingExecution(
       executionId: ExecutionIdentity
   )(updateFunc: WorkflowExecutions => Unit): Unit = {
-    if (!UserSystemConfig.isUserSystemEnabled) return
     try {
       val execution = workflowExecutionsDao.fetchOneByEid(executionId.id.toInt)
       updateFunc(execution)
diff --git 
a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala
 
b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala
index b6c8cbb88e..4430180d4b 100644
--- 
a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala
+++ 
b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala
@@ -50,9 +50,6 @@ object WorkflowExecutionService {
       workflowId: WorkflowIdentity,
       computingUnitId: Int
   ): Option[ExecutionIdentity] = {
-    if (!UserSystemConfig.isUserSystemEnabled) {
-      return Some(DEFAULT_EXECUTION_ID)
-    }
     WorkflowExecutionsResource
       .getLatestExecutionID(workflowId.id.toInt, computingUnitId)
       .map(eid => new ExecutionIdentity(eid.longValue()))
diff --git 
a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala
 
b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala
index 1fee982fde..a5f47e1153 100644
--- 
a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala
+++ 
b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala
@@ -207,35 +207,32 @@ class WorkflowService(
       req.computingUnitId
     )
 
-    if (UserSystemConfig.isUserSystemEnabled) {
-      // enable only if we have mysql
-      if (ApplicationConfig.faultToleranceLogRootFolder.isDefined) {
-        val writeLocation = 
ApplicationConfig.faultToleranceLogRootFolder.get.resolve(
-          s"${workflowContext.workflowId}/${workflowContext.executionId}/"
-        )
-        
ExecutionsMetadataPersistService.tryUpdateExistingExecution(workflowContext.executionId)
 {
-          execution => execution.setLogLocation(writeLocation.toString)
-        }
-        controllerConf = controllerConf.copy(faultToleranceConfOpt =
-          Some(FaultToleranceConfig(writeTo = writeLocation))
-        )
+    if (ApplicationConfig.faultToleranceLogRootFolder.isDefined) {
+      val writeLocation = 
ApplicationConfig.faultToleranceLogRootFolder.get.resolve(
+        s"${workflowContext.workflowId}/${workflowContext.executionId}/"
+      )
+      
ExecutionsMetadataPersistService.tryUpdateExistingExecution(workflowContext.executionId)
 {
+        execution => execution.setLogLocation(writeLocation.toString)
       }
-      if (req.replayFromExecution.isDefined) {
-        val replayInfo = req.replayFromExecution.get
-        ExecutionsMetadataPersistService
-          .tryGetExistingExecution(ExecutionIdentity(replayInfo.eid))
-          .foreach { execution =>
-            val readLocation = new URI(execution.getLogLocation)
-            controllerConf = controllerConf.copy(stateRestoreConfOpt =
-              Some(
-                StateRestoreConfig(
-                  readFrom = readLocation,
-                  replayDestination = 
EmbeddedControlMessageIdentity(replayInfo.interaction)
-                )
+      controllerConf = controllerConf.copy(faultToleranceConfOpt =
+        Some(FaultToleranceConfig(writeTo = writeLocation))
+      )
+    }
+    if (req.replayFromExecution.isDefined) {
+      val replayInfo = req.replayFromExecution.get
+      ExecutionsMetadataPersistService
+        .tryGetExistingExecution(ExecutionIdentity(replayInfo.eid))
+        .foreach { execution =>
+          val readLocation = new URI(execution.getLogLocation)
+          controllerConf = controllerConf.copy(stateRestoreConfOpt =
+            Some(
+              StateRestoreConfig(
+                readFrom = readLocation,
+                replayDestination = 
EmbeddedControlMessageIdentity(replayInfo.interaction)
               )
             )
-          }
-      }
+          )
+        }
     }
 
     val executionStateStore = new ExecutionStateStore()
diff --git 
a/core/config-service/src/main/scala/edu/uci/ics/texera/service/resource/ConfigResource.scala
 
b/core/config-service/src/main/scala/edu/uci/ics/texera/service/resource/ConfigResource.scala
index 3270e39af1..269001e6a3 100644
--- 
a/core/config-service/src/main/scala/edu/uci/ics/texera/service/resource/ConfigResource.scala
+++ 
b/core/config-service/src/main/scala/edu/uci/ics/texera/service/resource/ConfigResource.scala
@@ -65,7 +65,6 @@ class ConfigResource {
   def getUserSystemConfig: Map[String, Any] =
     Map(
       // flags from the user-system.conf
-      "inviteOnly" -> UserSystemConfig.inviteOnly,
-      "userSystemEnabled" -> UserSystemConfig.isUserSystemEnabled
+      "inviteOnly" -> UserSystemConfig.inviteOnly
     )
 }
diff --git a/core/config/src/main/resources/user-system.conf 
b/core/config/src/main/resources/user-system.conf
index 72f5e23d23..166ce2fc8f 100644
--- a/core/config/src/main/resources/user-system.conf
+++ b/core/config/src/main/resources/user-system.conf
@@ -17,9 +17,6 @@
 
 # See PR https://github.com/Texera/texera/pull/3326 for configuration 
guidelines.
 user-sys {
-  enabled = true
-  enabled = ${?USER_SYS_ENABLED}
-
   admin-username = "texera"
   admin-username = ${?USER_SYS_ADMIN_USERNAME}
 
diff --git 
a/core/config/src/main/scala/edu/uci/ics/texera/config/UserSystemConfig.scala 
b/core/config/src/main/scala/edu/uci/ics/texera/config/UserSystemConfig.scala
index d64ee7ff4c..eb7b91ad6e 100644
--- 
a/core/config/src/main/scala/edu/uci/ics/texera/config/UserSystemConfig.scala
+++ 
b/core/config/src/main/scala/edu/uci/ics/texera/config/UserSystemConfig.scala
@@ -27,7 +27,6 @@ object UserSystemConfig {
   private val logger = Logger.getLogger(getClass.getName)
 
   // User system
-  val isUserSystemEnabled: Boolean = conf.getBoolean("user-sys.enabled")
   val adminUsername: String = conf.getString("user-sys.admin-username")
   val adminPassword: String = conf.getString("user-sys.admin-password")
   val googleClientId: String = conf.getString("user-sys.google.clientId")
diff --git 
a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/ExecutionResourcesMapping.scala
 
b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/ExecutionResourcesMapping.scala
deleted file mode 100644
index faf7183473..0000000000
--- 
a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/ExecutionResourcesMapping.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package edu.uci.ics.amber.core.storage.result
-
-import edu.uci.ics.amber.core.virtualidentity.ExecutionIdentity
-
-import java.net.URI
-import scala.collection.mutable
-
-/**
-  * ExecutionResourcesMapping is a singleton for keeping track of resources 
associated with each execution.
-  *   It maintains a mapping from execution ID to a list of URIs, which point 
to resources like the result storage.
-  *
-  * Currently, this mapping is only used during the resource clean-up phase.
-  *
-  * This design has one limitation: the singleton is only accessible on the 
master node.
-  * While this aligns with the current system design, improvements are needed 
in the
-  * future to enhance scalability and flexibility.
-  *
-  * TODO: Move the mappings to an external, distributed, and persistent 
location to eliminate the master-node
-  *   dependency.
-  */
-object ExecutionResourcesMapping {
-
-  private val executionIdToExecutionResourcesMapping: 
mutable.Map[ExecutionIdentity, List[URI]] =
-    mutable.Map.empty
-
-  /**
-    * Get the URIs of given execution Id
-    * @param executionIdentity the target execution id
-    * @return
-    */
-  def getResourceURIs(executionIdentity: ExecutionIdentity): List[URI] = {
-    executionIdToExecutionResourcesMapping.getOrElseUpdate(executionIdentity, 
List())
-  }
-
-  /**
-    * Add the URI to the mapping
-    * @param executionIdentity the target execution
-    * @param uri the URI of the resource
-    */
-  def addResourceUri(executionIdentity: ExecutionIdentity, uri: URI): Unit = {
-    executionIdToExecutionResourcesMapping.updateWith(executionIdentity) {
-      case Some(existingUris) => Some(uri :: existingUris) // Prepend URI to 
the existing list
-      case None               => Some(List(uri)) // Create a new list if key 
doesn't exist
-    }
-  }
-
-  /**
-    * Remove all resources associated with a given execution ID.
-    *
-    * @param executionIdentity the target execution ID
-    * @return true if the entry was removed, false if it did not exist
-    */
-  def removeExecutionResources(executionIdentity: ExecutionIdentity): Boolean 
= {
-    executionIdToExecutionResourcesMapping.remove(executionIdentity).isDefined
-  }
-}


Reply via email to