This is an automated email from the ASF dual-hosted git repository. xiaozhenliu pushed a commit to branch xiaozhen-remove-non-user-mode in repository https://gitbox.apache.org/repos/asf/texera.git
commit d3b7e9b9dc1984fab0c01f0097a305c4b7b21b6c Author: Xiao-zhen-Liu <[email protected]> AuthorDate: Mon Oct 6 11:24:48 2025 -0700 refactor(config): remove user-sys enabled flag --- .../uci/ics/texera/web/ComputingUnitMaster.scala | 44 +++-- .../uci/ics/texera/web/TexeraWebApplication.scala | 4 +- .../edu/uci/ics/texera/web/auth/JwtAuth.scala | 31 ++-- .../texera/web/resource/auth/AuthResource.scala | 4 - .../web/resource/auth/GoogleAuthResource.scala | 2 - .../user/workflow/WorkflowExecutionsResource.scala | 187 +++++++++------------ .../web/service/ExecutionConsoleService.scala | 31 ++-- .../texera/web/service/ExecutionStatsService.scala | 122 ++++++-------- .../service/ExecutionsMetadataPersistService.scala | 3 - .../web/service/WorkflowExecutionService.scala | 3 - .../ics/texera/web/service/WorkflowService.scala | 49 +++--- .../texera/service/resource/ConfigResource.scala | 3 +- core/config/src/main/resources/user-system.conf | 3 - .../uci/ics/texera/config/UserSystemConfig.scala | 1 - .../storage/result/ExecutionResourcesMapping.scala | 75 --------- 15 files changed, 204 insertions(+), 358 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala index 7760267311..6de230ddd5 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala @@ -156,30 +156,28 @@ class ComputingUnitMaster extends io.dropwizard.Application[Configuration] with new WebsocketPayloadSizeTuner(ApplicationConfig.maxWorkflowWebsocketRequestPayloadSizeKb) ) - if (UserSystemConfig.isUserSystemEnabled) { - val timeToLive: Int = ApplicationConfig.sinkStorageTTLInSecs - if (ApplicationConfig.cleanupAllExecutionResults) { - // do one time cleanup of collections that were not closed gracefully before restart/crash - // retrieve all executions that were executing before the reboot. - val allExecutionsBeforeRestart: List[WorkflowExecutions] = - WorkflowExecutionsResource.getExpiredExecutionsWithResultOrLog(-1) - cleanExecutions( - allExecutionsBeforeRestart, - statusByte => { - if (statusByte != maptoStatusCode(COMPLETED)) { - maptoStatusCode(FAILED) // for incomplete executions, mark them as failed. - } else { - statusByte - } + val timeToLive: Int = ApplicationConfig.sinkStorageTTLInSecs + if (ApplicationConfig.cleanupAllExecutionResults) { + // do one time cleanup of collections that were not closed gracefully before restart/crash + // retrieve all executions that were executing before the reboot. + val allExecutionsBeforeRestart: List[WorkflowExecutions] = + WorkflowExecutionsResource.getExpiredExecutionsWithResultOrLog(-1) + cleanExecutions( + allExecutionsBeforeRestart, + statusByte => { + if (statusByte != maptoStatusCode(COMPLETED)) { + maptoStatusCode(FAILED) // for incomplete executions, mark them as failed. + } else { + statusByte } - ) - } - scheduleRecurringCallThroughActorSystem( - 2.seconds, - ApplicationConfig.sinkStorageCleanUpCheckIntervalInSecs.seconds - ) { - recurringCheckExpiredResults(timeToLive) - } + } + ) + } + scheduleRecurringCallThroughActorSystem( + 2.seconds, + ApplicationConfig.sinkStorageCleanUpCheckIntervalInSecs.seconds + ) { + recurringCheckExpiredResults(timeToLive) } environment.jersey.register(classOf[WorkflowExecutionsResource]) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/TexeraWebApplication.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/TexeraWebApplication.scala index a74b40c671..577c4aff4f 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/TexeraWebApplication.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/TexeraWebApplication.scala @@ -152,8 +152,6 @@ class TexeraWebApplication environment.jersey.register(classOf[AdminSettingsResource]) environment.jersey.register(classOf[AIAssistantResource]) - if (UserSystemConfig.isUserSystemEnabled) { - AuthResource.createAdminUser() - } + AuthResource.createAdminUser() } } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/auth/JwtAuth.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/auth/JwtAuth.scala index 0847102929..3a4488c3fb 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/auth/JwtAuth.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/auth/JwtAuth.scala @@ -30,26 +30,17 @@ import io.dropwizard.setup.Environment @Deprecated object JwtAuth { def setupJwtAuth(environment: Environment): Unit = { - if (UserSystemConfig.isUserSystemEnabled) { - // register JWT Auth layer - environment.jersey.register( - new AuthDynamicFeature( - new JwtAuthFilter.Builder[SessionUser]() - .setJwtConsumer(jwtConsumer) - .setRealm("realm") - .setPrefix("Bearer") - .setAuthenticator(UserAuthenticator) - .setAuthorizer(UserRoleAuthorizer) - .buildAuthFilter() - ) + // register JWT Auth layer + environment.jersey.register( + new AuthDynamicFeature( + new JwtAuthFilter.Builder[SessionUser]() + .setJwtConsumer(jwtConsumer) + .setRealm("realm") + .setPrefix("Bearer") + .setAuthenticator(UserAuthenticator) + .setAuthorizer(UserRoleAuthorizer) + .buildAuthFilter() ) - } else { - // register Guest Auth layer - environment.jersey.register( - new AuthDynamicFeature( - new GuestAuthFilter.Builder().setAuthorizer(UserRoleAuthorizer).buildAuthFilter() - ) - ) - } + ) } } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/auth/AuthResource.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/auth/AuthResource.scala index ccccdd768a..23e0bb04e8 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/auth/AuthResource.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/auth/AuthResource.scala @@ -89,8 +89,6 @@ class AuthResource { @POST @Path("/login") def login(request: UserLoginRequest): TokenIssueResponse = { - if (!UserSystemConfig.isUserSystemEnabled) - throw new NotAcceptableException("User System is disabled on the backend!") retrieveUserByUsernameAndPassword(request.username, request.password) match { case Some(user) => TokenIssueResponse(jwtToken(jwtClaims(user, TOKEN_EXPIRE_TIME_IN_MINUTES))) @@ -101,8 +99,6 @@ class AuthResource { @POST @Path("/register") def register(request: UserRegistrationRequest): TokenIssueResponse = { - if (!UserSystemConfig.isUserSystemEnabled) - throw new NotAcceptableException("User System is disabled on the backend!") val username = request.username if (username == null) throw new NotAcceptableException("Username cannot be null.") if (username.trim.isEmpty) throw new NotAcceptableException("Username cannot be empty.") diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/auth/GoogleAuthResource.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/auth/GoogleAuthResource.scala index d49d4d41a5..3bdc0f02eb 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/auth/GoogleAuthResource.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/auth/GoogleAuthResource.scala @@ -57,8 +57,6 @@ class GoogleAuthResource { @Produces(Array(MediaType.APPLICATION_JSON)) @Path("/login") def login(credential: String): TokenIssueResponse = { - if (!UserSystemConfig.isUserSystemEnabled) - throw new NotAcceptableException("User System is disabled on the backend!") val idToken = new GoogleIdTokenVerifier.Builder(new NetHttpTransport, GsonFactory.getDefaultInstance) .setAudience( diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala index c3b63a2df8..24433b76cd 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala @@ -19,7 +19,6 @@ package edu.uci.ics.texera.web.resource.dashboard.user.workflow -import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSResourceType, VFSURIFactory} import edu.uci.ics.amber.core.tuple.Tuple import edu.uci.ics.amber.core.virtualidentity._ @@ -28,13 +27,12 @@ import edu.uci.ics.amber.engine.architecture.logreplay.{ReplayDestination, Repla import edu.uci.ics.amber.engine.common.Utils.{maptoStatusCode, stringToAggregatedState} import edu.uci.ics.amber.engine.common.storage.SequentialRecordStorage import edu.uci.ics.amber.util.serde.GlobalPortIdentitySerde.SerdeOps +import edu.uci.ics.texera.auth.SessionUser import edu.uci.ics.texera.dao.SqlServer +import edu.uci.ics.texera.dao.SqlServer.withTransaction import edu.uci.ics.texera.dao.jooq.generated.Tables._ import edu.uci.ics.texera.dao.jooq.generated.tables.daos.WorkflowExecutionsDao import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.WorkflowExecutions -import edu.uci.ics.texera.auth.SessionUser -import edu.uci.ics.texera.config.UserSystemConfig -import edu.uci.ics.texera.dao.SqlServer.withTransaction import edu.uci.ics.texera.web.model.http.request.result.ResultExportRequest import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource._ import edu.uci.ics.texera.web.service.{ExecutionsMetadataPersistService, ResultExportService} @@ -107,19 +105,15 @@ object WorkflowExecutionsResource { globalPortId: GlobalPortIdentity, uri: URI ): Unit = { - if (UserSystemConfig.isUserSystemEnabled) { - context - .insertInto(OPERATOR_PORT_EXECUTIONS) - .columns( - OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID, - OPERATOR_PORT_EXECUTIONS.GLOBAL_PORT_ID, - OPERATOR_PORT_EXECUTIONS.RESULT_URI - ) - .values(eid.id.toInt, globalPortId.serializeAsString, uri.toString) - .execute() - } else { - ExecutionResourcesMapping.addResourceUri(eid, uri) - } + context + .insertInto(OPERATOR_PORT_EXECUTIONS) + .columns( + OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID, + OPERATOR_PORT_EXECUTIONS.GLOBAL_PORT_ID, + OPERATOR_PORT_EXECUTIONS.RESULT_URI + ) + .values(eid.id.toInt, globalPortId.serializeAsString, uri.toString) + .execute() } def insertOperatorExecutions( @@ -158,45 +152,37 @@ object WorkflowExecutionsResource { } def getResultUrisByExecutionId(eid: ExecutionIdentity): List[URI] = { - if (UserSystemConfig.isUserSystemEnabled) { - context - .select(OPERATOR_PORT_EXECUTIONS.RESULT_URI) - .from(OPERATOR_PORT_EXECUTIONS) - .where(OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt)) - .fetchInto(classOf[String]) - .asScala - .toList - .filter(uri => uri != null && uri.nonEmpty) - .map(URI.create) - } else { - ExecutionResourcesMapping.getResourceURIs(eid) - } + context + .select(OPERATOR_PORT_EXECUTIONS.RESULT_URI) + .from(OPERATOR_PORT_EXECUTIONS) + .where(OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt)) + .fetchInto(classOf[String]) + .asScala + .toList + .filter(uri => uri != null && uri.nonEmpty) + .map(URI.create) } def getConsoleMessagesUriByExecutionId(eid: ExecutionIdentity): List[URI] = - if (UserSystemConfig.isUserSystemEnabled) - context - .select(OPERATOR_EXECUTIONS.CONSOLE_MESSAGES_URI) - .from(OPERATOR_EXECUTIONS) - .where(OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt)) - .fetchInto(classOf[String]) - .asScala - .toList - .filter(uri => uri != null && uri.nonEmpty) - .map(URI.create) - else Nil + context + .select(OPERATOR_EXECUTIONS.CONSOLE_MESSAGES_URI) + .from(OPERATOR_EXECUTIONS) + .where(OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt)) + .fetchInto(classOf[String]) + .asScala + .toList + .filter(uri => uri != null && uri.nonEmpty) + .map(URI.create) def getRuntimeStatsUriByExecutionId(eid: ExecutionIdentity): Option[URI] = - if (UserSystemConfig.isUserSystemEnabled) - Option( - context - .select(WORKFLOW_EXECUTIONS.RUNTIME_STATS_URI) - .from(WORKFLOW_EXECUTIONS) - .where(WORKFLOW_EXECUTIONS.EID.eq(eid.id.toInt)) - .fetchOneInto(classOf[String]) - ).filter(_.nonEmpty) - .map(URI.create) - else None + Option( + context + .select(WORKFLOW_EXECUTIONS.RUNTIME_STATS_URI) + .from(WORKFLOW_EXECUTIONS) + .where(WORKFLOW_EXECUTIONS.EID.eq(eid.id.toInt)) + .fetchOneInto(classOf[String]) + ).filter(_.nonEmpty) + .map(URI.create) def getWorkflowExecutions( wid: Integer, @@ -239,18 +225,14 @@ object WorkflowExecutionsResource { } def deleteConsoleMessageAndExecutionResultUris(eid: ExecutionIdentity): Unit = { - if (UserSystemConfig.isUserSystemEnabled) { - context - .delete(OPERATOR_PORT_EXECUTIONS) - .where(OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt)) - .execute() - context - .delete(OPERATOR_EXECUTIONS) - .where(OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt)) - .execute() - } else { - ExecutionResourcesMapping.removeExecutionResources(eid) - } + context + .delete(OPERATOR_PORT_EXECUTIONS) + .where(OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt)) + .execute() + context + .delete(OPERATOR_EXECUTIONS) + .where(OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt)) + .execute() } /** @@ -316,22 +298,20 @@ object WorkflowExecutionsResource { * @param eid Execution ID associated with the runtime statistics document. */ def updateRuntimeStatsSize(eid: ExecutionIdentity): Unit = { - if (UserSystemConfig.isUserSystemEnabled) { - val statsUriOpt = context - .select(WORKFLOW_EXECUTIONS.RUNTIME_STATS_URI) - .from(WORKFLOW_EXECUTIONS) - .where(WORKFLOW_EXECUTIONS.EID.eq(eid.id.toInt)) - .fetchOptionalInto(classOf[String]) - .map(URI.create) + val statsUriOpt = context + .select(WORKFLOW_EXECUTIONS.RUNTIME_STATS_URI) + .from(WORKFLOW_EXECUTIONS) + .where(WORKFLOW_EXECUTIONS.EID.eq(eid.id.toInt)) + .fetchOptionalInto(classOf[String]) + .map(URI.create) - if (statsUriOpt.isPresent) { - val size = DocumentFactory.openDocument(statsUriOpt.get)._1.getTotalFileSize - context - .update(WORKFLOW_EXECUTIONS) - .set(WORKFLOW_EXECUTIONS.RUNTIME_STATS_SIZE, Integer.valueOf(size.toInt)) - .where(WORKFLOW_EXECUTIONS.EID.eq(eid.id.toInt)) - .execute() - } + if (statsUriOpt.isPresent) { + val size = DocumentFactory.openDocument(statsUriOpt.get)._1.getTotalFileSize + context + .update(WORKFLOW_EXECUTIONS) + .set(WORKFLOW_EXECUTIONS.RUNTIME_STATS_SIZE, Integer.valueOf(size.toInt)) + .where(WORKFLOW_EXECUTIONS.EID.eq(eid.id.toInt)) + .execute() } } @@ -342,24 +322,22 @@ object WorkflowExecutionsResource { * @param opId Operator ID of the corresponding operator. */ def updateConsoleMessageSize(eid: ExecutionIdentity, opId: OperatorIdentity): Unit = { - if (UserSystemConfig.isUserSystemEnabled) { - val uriOpt = context - .select(OPERATOR_EXECUTIONS.CONSOLE_MESSAGES_URI) - .from(OPERATOR_EXECUTIONS) + val uriOpt = context + .select(OPERATOR_EXECUTIONS.CONSOLE_MESSAGES_URI) + .from(OPERATOR_EXECUTIONS) + .where(OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt)) + .and(OPERATOR_EXECUTIONS.OPERATOR_ID.eq(opId.id)) + .fetchOptionalInto(classOf[String]) + .map(URI.create) + + if (uriOpt.isPresent) { + val size = DocumentFactory.openDocument(uriOpt.get)._1.getTotalFileSize + context + .update(OPERATOR_EXECUTIONS) + .set(OPERATOR_EXECUTIONS.CONSOLE_MESSAGES_SIZE, Integer.valueOf(size.toInt)) .where(OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt)) .and(OPERATOR_EXECUTIONS.OPERATOR_ID.eq(opId.id)) - .fetchOptionalInto(classOf[String]) - .map(URI.create) - - if (uriOpt.isPresent) { - val size = DocumentFactory.openDocument(uriOpt.get)._1.getTotalFileSize - context - .update(OPERATOR_EXECUTIONS) - .set(OPERATOR_EXECUTIONS.CONSOLE_MESSAGES_SIZE, Integer.valueOf(size.toInt)) - .where(OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt)) - .and(OPERATOR_EXECUTIONS.OPERATOR_ID.eq(opId.id)) - .execute() - } + .execute() } } @@ -368,7 +346,6 @@ object WorkflowExecutionsResource { * this method finds the URI for a globalPortId that both: 1. matches the logicalOpId and outputPortId, and * 2. is an external port. Currently the lookup is O(n), where n is the number of globalPortIds for this execution. * TODO: Optimize the lookup once the frontend also has information about physical operators. - * TODO: Remove the case of using ExecutionResourceMapping when user system is permenantly enabled even in dev mode. */ def getResultUriByLogicalPortId( eid: ExecutionIdentity, @@ -386,18 +363,14 @@ object WorkflowExecutionsResource { } val urisOfEid: List[URI] = - if (UserSystemConfig.isUserSystemEnabled) { - context - .select(OPERATOR_PORT_EXECUTIONS.RESULT_URI) - .from(OPERATOR_PORT_EXECUTIONS) - .where(OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt)) - .fetchInto(classOf[String]) - .asScala - .toList - .map(URI.create) - } else { - ExecutionResourcesMapping.getResourceURIs(eid) - } + context + .select(OPERATOR_PORT_EXECUTIONS.RESULT_URI) + .from(OPERATOR_PORT_EXECUTIONS) + .where(OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(eid.id.toInt)) + .fetchInto(classOf[String]) + .asScala + .toList + .map(URI.create) urisOfEid.find(isMatchingExternalPortURI) } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionConsoleService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionConsoleService.scala index f69a1c81d3..929689fcf7 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionConsoleService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionConsoleService.scala @@ -139,8 +139,7 @@ class ExecutionConsoleService( private val consoleMessageOpIdToWriterMap: mutable.Map[String, BufferedItemWriter[Tuple]] = mutable.Map() - private val consoleWriterThread: Option[ExecutorService] = - Option.when(UserSystemConfig.isUserSystemEnabled)(Executors.newSingleThreadExecutor()) + private val consoleWriterThread: ExecutorService = Executors.newSingleThreadExecutor() private def getOrCreateWriter(opId: OperatorIdentity): BufferedItemWriter[Tuple] = { consoleMessageOpIdToWriterMap.getOrElseUpdate( @@ -261,21 +260,19 @@ class ExecutionConsoleService( consoleMessage: ConsoleMessage ): ExecutionConsoleStore = { // Write the original full message to the database - consoleWriterThread.foreach { thread => - thread.execute(() => { - val writer = getOrCreateWriter(OperatorIdentity(opId)) - try { - val tuple = new Tuple( - ResultSchema.consoleMessagesSchema, - Array(consoleMessage.toProtoString) - ) - writer.putOne(tuple) - } catch { - case e: Exception => - logger.error(s"Error while writing console message for operator $opId", e) - } - }) - } + consoleWriterThread.execute(() => { + val writer = getOrCreateWriter(OperatorIdentity(opId)) + try { + val tuple = new Tuple( + ResultSchema.consoleMessagesSchema, + Array(consoleMessage.toProtoString) + ) + writer.putOne(tuple) + } catch { + case e: Exception => + logger.error(s"Error while writing console message for operator $opId", e) + } + }) // Process the message (truncate if needed) and update store val truncatedMessage = processConsoleMessage(consoleMessage) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionStatsService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionStatsService.scala index 6aabe7c4eb..9235fdaabf 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionStatsService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionStatsService.scala @@ -72,30 +72,26 @@ class ExecutionStatsService( ) extends SubscriptionManager with LazyLogging { private val (metricsPersistThread, runtimeStatsWriter) = { - if (UserSystemConfig.isUserSystemEnabled) { - val thread = Executors.newSingleThreadExecutor() - val uri = VFSURIFactory.createRuntimeStatisticsURI( - workflowContext.workflowId, - workflowContext.executionId - ) - val writer = DocumentFactory - .createDocument(uri, ResultSchema.runtimeStatisticsSchema) - .writer("runtime_statistics") - .asInstanceOf[BufferedItemWriter[Tuple]] - WorkflowExecutionsResource.updateRuntimeStatsUri( - workflowContext.workflowId.id, - workflowContext.executionId.id, - uri - ) - writer.open() - (Some(thread), Some(writer)) - } else { - (None, None) - } + val thread = Executors.newSingleThreadExecutor() + val uri = VFSURIFactory.createRuntimeStatisticsURI( + workflowContext.workflowId, + workflowContext.executionId + ) + val writer = DocumentFactory + .createDocument(uri, ResultSchema.runtimeStatisticsSchema) + .writer("runtime_statistics") + .asInstanceOf[BufferedItemWriter[Tuple]] + WorkflowExecutionsResource.updateRuntimeStatsUri( + workflowContext.workflowId.id, + workflowContext.executionId.id, + uri + ) + writer.open() + (thread, writer) } - private var lastPersistedMetrics: Option[Map[String, OperatorMetrics]] = - Option.when(UserSystemConfig.isUserSystemEnabled)(Map.empty[String, OperatorMetrics]) + private var lastPersistedMetrics: Map[String, OperatorMetrics] = + Map.empty[String, OperatorMetrics] registerCallbacks() @@ -189,12 +185,10 @@ class ExecutionStatsService( stateStore.statsStore.updateState { statsStore => statsStore.withOperatorInfo(evt.operatorMetrics) } - metricsPersistThread.foreach { thread => - thread.execute(() => { - storeRuntimeStatistics(computeStatsDiff(evt.operatorMetrics)) - lastPersistedMetrics = Some(evt.operatorMetrics) - }) - } + metricsPersistThread.execute(() => { + storeRuntimeStatistics(computeStatsDiff(evt.operatorMetrics)) + lastPersistedMetrics = evt.operatorMetrics + }) }) ) } @@ -204,13 +198,11 @@ class ExecutionStatsService( case ExecutionStateUpdate(state: WorkflowAggregatedState.Recognized) if Set(COMPLETED, FAILED, KILLED).contains(state) => logger.info("Workflow execution terminated. Commit runtime statistics.") - runtimeStatsWriter.foreach { writer => - try { - writer.close() - } catch { - case e: Exception => - logger.error("Failed to close runtime statistics writer", e) - } + try { + runtimeStatsWriter.close() + } catch { + case e: Exception => + logger.error("Failed to close runtime statistics writer", e) } case _ => } @@ -225,15 +217,12 @@ class ExecutionStatsService( OperatorStatistics(Seq.empty, Seq.empty, 0, 0, 0, 0) ) - // Retrieve the last persisted metrics or default to an empty map - val lastMetrics = lastPersistedMetrics.getOrElse(Map.empty) - // Determine new and old keys - val newKeys = newMetrics.keySet.diff(lastMetrics.keySet) - val oldKeys = lastMetrics.keySet.diff(newMetrics.keySet) + val newKeys = newMetrics.keySet.diff(lastPersistedMetrics.keySet) + val oldKeys = lastPersistedMetrics.keySet.diff(newMetrics.keySet) // Update last metrics with default metrics for new keys - val updatedLastMetrics = lastMetrics ++ newKeys.map(_ -> defaultMetrics) + val updatedLastMetrics = lastPersistedMetrics ++ newKeys.map(_ -> defaultMetrics) // Combine new metrics with old metrics for keys that are no longer present val completeMetricsMap = newMetrics ++ oldKeys.map(key => key -> updatedLastMetrics(key)) @@ -258,34 +247,29 @@ class ExecutionStatsService( private def storeRuntimeStatistics( operatorStatistics: scala.collection.immutable.Map[String, OperatorMetrics] ): Unit = { - runtimeStatsWriter match { - case Some(writer) => - try { - operatorStatistics.foreach { - case (operatorId, stat) => - val runtimeStats = new Tuple( - ResultSchema.runtimeStatisticsSchema, - Array( - operatorId, - new java.sql.Timestamp(System.currentTimeMillis()), - stat.operatorStatistics.inputMetrics.map(_.tupleMetrics.count).sum, - stat.operatorStatistics.inputMetrics.map(_.tupleMetrics.size).sum, - stat.operatorStatistics.outputMetrics.map(_.tupleMetrics.count).sum, - stat.operatorStatistics.outputMetrics.map(_.tupleMetrics.size).sum, - stat.operatorStatistics.dataProcessingTime, - stat.operatorStatistics.controlProcessingTime, - stat.operatorStatistics.idleTime, - stat.operatorStatistics.numWorkers, - maptoStatusCode(stat.operatorState).toInt - ) - ) - writer.putOne(runtimeStats) - } - } catch { - case err: Throwable => logger.error("error occurred when storing runtime statistics", err) - } - case None => - logger.warn("Runtime statistics writer is not available.") + try { + operatorStatistics.foreach { + case (operatorId, stat) => + val runtimeStats = new Tuple( + ResultSchema.runtimeStatisticsSchema, + Array( + operatorId, + new java.sql.Timestamp(System.currentTimeMillis()), + stat.operatorStatistics.inputMetrics.map(_.tupleMetrics.count).sum, + stat.operatorStatistics.inputMetrics.map(_.tupleMetrics.size).sum, + stat.operatorStatistics.outputMetrics.map(_.tupleMetrics.count).sum, + stat.operatorStatistics.outputMetrics.map(_.tupleMetrics.size).sum, + stat.operatorStatistics.dataProcessingTime, + stat.operatorStatistics.controlProcessingTime, + stat.operatorStatistics.idleTime, + stat.operatorStatistics.numWorkers, + maptoStatusCode(stat.operatorState).toInt + ) + ) + runtimeStatsWriter.putOne(runtimeStats) + } + } catch { + case err: Throwable => logger.error("error occurred when storing runtime statistics", err) } } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionsMetadataPersistService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionsMetadataPersistService.scala index c1f49a8470..16f7e3cd7a 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionsMetadataPersistService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionsMetadataPersistService.scala @@ -57,7 +57,6 @@ object ExecutionsMetadataPersistService extends LazyLogging { environmentVersion: String, computingUnitId: Integer ): ExecutionIdentity = { - if (!UserSystemConfig.isUserSystemEnabled) return DEFAULT_EXECUTION_ID // first retrieve the latest version of this workflow val vid = getLatestVersion(workflowId.id.toInt) val newExecution = new WorkflowExecutions() @@ -77,7 +76,6 @@ object ExecutionsMetadataPersistService extends LazyLogging { } def tryGetExistingExecution(executionId: ExecutionIdentity): Option[WorkflowExecutions] = { - if (!UserSystemConfig.isUserSystemEnabled) return None try { Some(workflowExecutionsDao.fetchOneByEid(executionId.id.toInt)) } catch { @@ -90,7 +88,6 @@ object ExecutionsMetadataPersistService extends LazyLogging { def tryUpdateExistingExecution( executionId: ExecutionIdentity )(updateFunc: WorkflowExecutions => Unit): Unit = { - if (!UserSystemConfig.isUserSystemEnabled) return try { val execution = workflowExecutionsDao.fetchOneByEid(executionId.id.toInt) updateFunc(execution) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala index b6c8cbb88e..4430180d4b 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala @@ -50,9 +50,6 @@ object WorkflowExecutionService { workflowId: WorkflowIdentity, computingUnitId: Int ): Option[ExecutionIdentity] = { - if (!UserSystemConfig.isUserSystemEnabled) { - return Some(DEFAULT_EXECUTION_ID) - } WorkflowExecutionsResource .getLatestExecutionID(workflowId.id.toInt, computingUnitId) .map(eid => new ExecutionIdentity(eid.longValue())) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala index 1fee982fde..a5f47e1153 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowService.scala @@ -207,35 +207,32 @@ class WorkflowService( req.computingUnitId ) - if (UserSystemConfig.isUserSystemEnabled) { - // enable only if we have mysql - if (ApplicationConfig.faultToleranceLogRootFolder.isDefined) { - val writeLocation = ApplicationConfig.faultToleranceLogRootFolder.get.resolve( - s"${workflowContext.workflowId}/${workflowContext.executionId}/" - ) - ExecutionsMetadataPersistService.tryUpdateExistingExecution(workflowContext.executionId) { - execution => execution.setLogLocation(writeLocation.toString) - } - controllerConf = controllerConf.copy(faultToleranceConfOpt = - Some(FaultToleranceConfig(writeTo = writeLocation)) - ) + if (ApplicationConfig.faultToleranceLogRootFolder.isDefined) { + val writeLocation = ApplicationConfig.faultToleranceLogRootFolder.get.resolve( + s"${workflowContext.workflowId}/${workflowContext.executionId}/" + ) + ExecutionsMetadataPersistService.tryUpdateExistingExecution(workflowContext.executionId) { + execution => execution.setLogLocation(writeLocation.toString) } - if (req.replayFromExecution.isDefined) { - val replayInfo = req.replayFromExecution.get - ExecutionsMetadataPersistService - .tryGetExistingExecution(ExecutionIdentity(replayInfo.eid)) - .foreach { execution => - val readLocation = new URI(execution.getLogLocation) - controllerConf = controllerConf.copy(stateRestoreConfOpt = - Some( - StateRestoreConfig( - readFrom = readLocation, - replayDestination = EmbeddedControlMessageIdentity(replayInfo.interaction) - ) + controllerConf = controllerConf.copy(faultToleranceConfOpt = + Some(FaultToleranceConfig(writeTo = writeLocation)) + ) + } + if (req.replayFromExecution.isDefined) { + val replayInfo = req.replayFromExecution.get + ExecutionsMetadataPersistService + .tryGetExistingExecution(ExecutionIdentity(replayInfo.eid)) + .foreach { execution => + val readLocation = new URI(execution.getLogLocation) + controllerConf = controllerConf.copy(stateRestoreConfOpt = + Some( + StateRestoreConfig( + readFrom = readLocation, + replayDestination = EmbeddedControlMessageIdentity(replayInfo.interaction) ) ) - } - } + ) + } } val executionStateStore = new ExecutionStateStore() diff --git a/core/config-service/src/main/scala/edu/uci/ics/texera/service/resource/ConfigResource.scala b/core/config-service/src/main/scala/edu/uci/ics/texera/service/resource/ConfigResource.scala index 3270e39af1..269001e6a3 100644 --- a/core/config-service/src/main/scala/edu/uci/ics/texera/service/resource/ConfigResource.scala +++ b/core/config-service/src/main/scala/edu/uci/ics/texera/service/resource/ConfigResource.scala @@ -65,7 +65,6 @@ class ConfigResource { def getUserSystemConfig: Map[String, Any] = Map( // flags from the user-system.conf - "inviteOnly" -> UserSystemConfig.inviteOnly, - "userSystemEnabled" -> UserSystemConfig.isUserSystemEnabled + "inviteOnly" -> UserSystemConfig.inviteOnly ) } diff --git a/core/config/src/main/resources/user-system.conf b/core/config/src/main/resources/user-system.conf index 72f5e23d23..166ce2fc8f 100644 --- a/core/config/src/main/resources/user-system.conf +++ b/core/config/src/main/resources/user-system.conf @@ -17,9 +17,6 @@ # See PR https://github.com/Texera/texera/pull/3326 for configuration guidelines. user-sys { - enabled = true - enabled = ${?USER_SYS_ENABLED} - admin-username = "texera" admin-username = ${?USER_SYS_ADMIN_USERNAME} diff --git a/core/config/src/main/scala/edu/uci/ics/texera/config/UserSystemConfig.scala b/core/config/src/main/scala/edu/uci/ics/texera/config/UserSystemConfig.scala index d64ee7ff4c..eb7b91ad6e 100644 --- a/core/config/src/main/scala/edu/uci/ics/texera/config/UserSystemConfig.scala +++ b/core/config/src/main/scala/edu/uci/ics/texera/config/UserSystemConfig.scala @@ -27,7 +27,6 @@ object UserSystemConfig { private val logger = Logger.getLogger(getClass.getName) // User system - val isUserSystemEnabled: Boolean = conf.getBoolean("user-sys.enabled") val adminUsername: String = conf.getString("user-sys.admin-username") val adminPassword: String = conf.getString("user-sys.admin-password") val googleClientId: String = conf.getString("user-sys.google.clientId") diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/ExecutionResourcesMapping.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/ExecutionResourcesMapping.scala deleted file mode 100644 index faf7183473..0000000000 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/ExecutionResourcesMapping.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package edu.uci.ics.amber.core.storage.result - -import edu.uci.ics.amber.core.virtualidentity.ExecutionIdentity - -import java.net.URI -import scala.collection.mutable - -/** - * ExecutionResourcesMapping is a singleton for keeping track of resources associated with each execution. - * It maintains a mapping from execution ID to a list of URIs, which point to resources like the result storage. - * - * Currently, this mapping is only used during the resource clean-up phase. - * - * This design has one limitation: the singleton is only accessible on the master node. - * While this aligns with the current system design, improvements are needed in the - * future to enhance scalability and flexibility. - * - * TODO: Move the mappings to an external, distributed, and persistent location to eliminate the master-node - * dependency. - */ -object ExecutionResourcesMapping { - - private val executionIdToExecutionResourcesMapping: mutable.Map[ExecutionIdentity, List[URI]] = - mutable.Map.empty - - /** - * Get the URIs of given execution Id - * @param executionIdentity the target execution id - * @return - */ - def getResourceURIs(executionIdentity: ExecutionIdentity): List[URI] = { - executionIdToExecutionResourcesMapping.getOrElseUpdate(executionIdentity, List()) - } - - /** - * Add the URI to the mapping - * @param executionIdentity the target execution - * @param uri the URI of the resource - */ - def addResourceUri(executionIdentity: ExecutionIdentity, uri: URI): Unit = { - executionIdToExecutionResourcesMapping.updateWith(executionIdentity) { - case Some(existingUris) => Some(uri :: existingUris) // Prepend URI to the existing list - case None => Some(List(uri)) // Create a new list if key doesn't exist - } - } - - /** - * Remove all resources associated with a given execution ID. - * - * @param executionIdentity the target execution ID - * @return true if the entry was removed, false if it did not exist - */ - def removeExecutionResources(executionIdentity: ExecutionIdentity): Boolean = { - executionIdToExecutionResourcesMapping.remove(executionIdentity).isDefined - } -}
