This is an automated email from the ASF dual-hosted git repository.
linxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 5b3cabff46 chore: remove deprecated result storage flag (#4210)
5b3cabff46 is described below
commit 5b3cabff460aa0e87703873df8e158fcf0709caa
Author: Xinyuan Lin <[email protected]>
AuthorDate: Thu Feb 26 22:57:07 2026 -0800
chore: remove deprecated result storage flag (#4210)
---
.../websocket/event/WebResultUpdateEvent.scala | 3 +-
.../web/service/ExecutionResultService.scala | 5 +-
common/config/src/main/resources/storage.conf | 2 -
.../amber/config/EnvironmentalVariable.scala | 5 -
.../apache/texera/amber/config/StorageConfig.scala | 3 -
.../amber/core/storage/DocumentFactory.scala | 102 +++++++++------------
.../result-table-frame.component.ts | 17 +---
.../workflow-result/workflow-result.service.ts | 12 +--
.../workspace/types/execute-workflow.interface.ts | 1 -
9 files changed, 50 insertions(+), 100 deletions(-)
diff --git
a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/WebResultUpdateEvent.scala
b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/WebResultUpdateEvent.scala
index 5689eaafbc..7e31be1ca7 100644
---
a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/WebResultUpdateEvent.scala
+++
b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/WebResultUpdateEvent.scala
@@ -23,6 +23,5 @@ import
org.apache.texera.web.service.ExecutionResultService.WebResultUpdate
case class WebResultUpdateEvent(
updates: Map[String, WebResultUpdate],
- tableStats: Map[String, Map[String, Map[String, Any]]],
- resultStorageMode: String
+ tableStats: Map[String, Map[String, Map[String, Any]]]
) extends TexeraWebSocketEvent
diff --git
a/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala
b/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala
index 8810e9891f..285c836b60 100644
---
a/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala
+++
b/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala
@@ -382,7 +382,7 @@ class ExecutionResultService(
outputPort.mode == OutputMode.SINGLE_SNAPSHOT
}
- if (StorageConfig.resultStorageMode == ICEBERG &&
!hasSingleSnapshot) {
+ if (!hasSingleSnapshot) {
val storageUri = WorkflowExecutionsResource
.getResultUriByLogicalPortId(
executionId,
@@ -408,8 +408,7 @@ class ExecutionResultService(
Iterable(
WebResultUpdateEvent(
buf.toMap,
- allTableStats.toMap,
- StorageConfig.resultStorageMode.toLowerCase
+ allTableStats.toMap
)
)
})
diff --git a/common/config/src/main/resources/storage.conf
b/common/config/src/main/resources/storage.conf
index 85a62b77a3..276d1491cd 100644
--- a/common/config/src/main/resources/storage.conf
+++ b/common/config/src/main/resources/storage.conf
@@ -17,8 +17,6 @@
# See PR https://github.com/Texera/texera/pull/3326 for configuration
guidelines.
storage {
- result-storage-mode = iceberg # either mongodb or iceberg, mongodb will be
deprecated soon
- result-storage-mode = ${?STORAGE_RESULT_MODE}
# Configuration for Apache Iceberg, used for storing the workflow results
& stats
iceberg {
diff --git
a/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala
b/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala
index 099e12260d..1adc323305 100644
---
a/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala
+++
b/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala
@@ -44,11 +44,6 @@ object EnvironmentalVariable {
*/
val ENV_USER_JWT_TOKEN = "USER_JWT_TOKEN"
- /**
- * Variables in storage.conf
- */
- val ENV_RESULT_STORAGE_MODE = "STORAGE_RESULT_MODE"
-
// JDBC
val ENV_JDBC_URL = "STORAGE_JDBC_URL"
val ENV_JDBC_USERNAME = "STORAGE_JDBC_USERNAME"
diff --git
a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
index c5bd330286..3bc1e05a9b 100644
---
a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
+++
b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
@@ -28,9 +28,6 @@ object StorageConfig {
// Load configuration
private val conf: Config =
ConfigFactory.parseResources("storage.conf").resolve()
- // General storage settings
- val resultStorageMode: String = conf.getString("storage.result-storage-mode")
-
// JDBC specifics
val jdbcUrl: String = conf.getString("storage.jdbc.url")
val jdbcUrlForTestCases: String =
conf.getString("storage.jdbc.url-for-test-cases")
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
index 4c37c33bb2..15949ef471 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
@@ -76,32 +76,25 @@ object DocumentFactory {
throw new IllegalArgumentException(s"Resource type $resourceType
is not supported")
}
- StorageConfig.resultStorageMode.toLowerCase match {
- case ICEBERG =>
- val icebergSchema = IcebergUtil.toIcebergSchema(schema)
- IcebergUtil.createTable(
- IcebergCatalogInstance.getInstance(),
- namespace,
- storageKey,
- icebergSchema,
- overrideIfExists = true
- )
- val serde: (IcebergSchema, Tuple) => Record =
IcebergUtil.toGenericRecord
- val deserde: (IcebergSchema, Record) => Tuple = (schema, record) =>
- IcebergUtil.fromRecord(record,
IcebergUtil.fromIcebergSchema(schema))
-
- new IcebergDocument[Tuple](
- namespace,
- storageKey,
- icebergSchema,
- serde,
- deserde
- )
- case unsupportedMode =>
- throw new IllegalArgumentException(
- s"Storage mode '$unsupportedMode' is not supported"
- )
- }
+ val icebergSchema = IcebergUtil.toIcebergSchema(schema)
+ IcebergUtil.createTable(
+ IcebergCatalogInstance.getInstance(),
+ namespace,
+ storageKey,
+ icebergSchema,
+ overrideIfExists = true
+ )
+ val serde: (IcebergSchema, Tuple) => Record =
IcebergUtil.toGenericRecord
+ val deserde: (IcebergSchema, Record) => Tuple = (schema, record) =>
+ IcebergUtil.fromRecord(record, IcebergUtil.fromIcebergSchema(schema))
+
+ new IcebergDocument[Tuple](
+ namespace,
+ storageKey,
+ icebergSchema,
+ serde,
+ deserde
+ )
case unsupportedScheme =>
throw new UnsupportedOperationException(
s"Unsupported URI scheme: $unsupportedScheme for creating the
document"
@@ -130,38 +123,31 @@ object DocumentFactory {
throw new IllegalArgumentException(s"Resource type $resourceType
is not supported")
}
- StorageConfig.resultStorageMode.toLowerCase match {
- case ICEBERG =>
- val table = IcebergUtil
- .loadTableMetadata(
- IcebergCatalogInstance.getInstance(),
- namespace,
- storageKey
- )
- .getOrElse(
- throw new IllegalArgumentException("No storage is found for
the given URI")
- )
-
- val amberSchema = IcebergUtil.fromIcebergSchema(table.schema())
- val serde: (IcebergSchema, Tuple) => Record =
IcebergUtil.toGenericRecord
- val deserde: (IcebergSchema, Record) => Tuple = (schema, record) =>
- IcebergUtil.fromRecord(record,
IcebergUtil.fromIcebergSchema(schema))
-
- (
- new IcebergDocument[Tuple](
- namespace,
- storageKey,
- table.schema(),
- serde,
- deserde
- ),
- Some(amberSchema)
- )
- case mode =>
- throw new IllegalArgumentException(
- s"Storage mode '$mode' is not supported"
- )
- }
+ val table = IcebergUtil
+ .loadTableMetadata(
+ IcebergCatalogInstance.getInstance(),
+ namespace,
+ storageKey
+ )
+ .getOrElse(
+ throw new IllegalArgumentException("No storage is found for the
given URI")
+ )
+
+ val amberSchema = IcebergUtil.fromIcebergSchema(table.schema())
+ val serde: (IcebergSchema, Tuple) => Record =
IcebergUtil.toGenericRecord
+ val deserde: (IcebergSchema, Record) => Tuple = (schema, record) =>
+ IcebergUtil.fromRecord(record, IcebergUtil.fromIcebergSchema(schema))
+
+ (
+ new IcebergDocument[Tuple](
+ namespace,
+ storageKey,
+ table.schema(),
+ serde,
+ deserde
+ ),
+ Some(amberSchema)
+ )
case unsupportedScheme =>
throw new UnsupportedOperationException(
s"Unsupported URI scheme: $unsupportedScheme for opening the
document"
diff --git
a/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.ts
b/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.ts
index abb6daa882..383c6daa71 100644
---
a/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.ts
+++
b/frontend/src/app/workspace/component/result-panel/result-table-frame/result-table-frame.component.ts
@@ -29,7 +29,6 @@ import { RowModalComponent } from
"../result-panel-modal.component";
import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy";
import { DomSanitizer, SafeHtml } from "@angular/platform-browser";
import { ResultExportationComponent } from
"../../result-exportation/result-exportation.component";
-import { SchemaAttribute } from "../../../types/workflow-compiling.interface";
import { WorkflowStatusService } from
"../../../service/workflow-status/workflow-status.service";
import { GuiConfigService } from
"../../../../common/service/gui-config.service";
@@ -74,8 +73,6 @@ export class ResultTableFrameComponent implements OnInit,
OnChanges {
tableStats: Record<string, Record<string, number>> = {};
prevTableStats: Record<string, Record<string, number>> = {};
widthPercent: string = "";
- sinkStorageMode: string = "";
- private schema: ReadonlyArray<SchemaAttribute> = [];
isOperatorFinished: boolean = false;
constructor(
@@ -101,7 +98,6 @@ export class ResultTableFrameComponent implements OnInit,
OnChanges {
this.tableStats = paginatedResultService.getStats();
this.prevTableStats = this.tableStats;
- this.schema = paginatedResultService.getSchema();
}
}
}
@@ -160,13 +156,6 @@ export class ResultTableFrameComponent implements OnInit,
OnChanges {
}
});
- this.workflowResultService
- .getSinkStorageMode()
- .pipe(untilDestroyed(this))
- .subscribe(sinkStorageMode => {
- this.sinkStorageMode = sinkStorageMode;
- });
-
this.resizeService.currentSize.pipe(untilDestroyed(this)).subscribe(size
=> {
this.panelHeight = size.height;
this.adjustPageSizeBasedOnPanelSize(size.height);
@@ -179,7 +168,6 @@ export class ResultTableFrameComponent implements OnInit,
OnChanges {
if (this.operatorId) {
const paginatedResultService =
this.workflowResultService.getPaginatedResultService(this.operatorId);
if (paginatedResultService) {
- this.schema = paginatedResultService.getSchema();
}
}
}
@@ -207,8 +195,8 @@ export class ResultTableFrameComponent implements OnInit,
OnChanges {
compare(field: string, stats: string): SafeHtml {
let current = this.tableStats[field][stats];
let previous = this.prevTableStats[field][stats];
- let currentStr = "";
- let previousStr = "";
+ let currentStr: string;
+ let previousStr: string;
if (typeof current === "number" && typeof previous === "number") {
currentStr = current.toFixed(2);
@@ -370,7 +358,6 @@ export class ResultTableFrameComponent implements OnInit,
OnChanges {
.subscribe(pageData => {
if (this.currentPageIndex === pageData.pageIndex) {
this.setupResultTable(pageData.table,
paginatedResultService.getCurrentTotalNumTuples());
- this.schema = pageData.schema;
this.changeDetectorRef.detectChanges();
}
});
diff --git
a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts
b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts
index 96937ccbec..9fd18e0f16 100644
---
a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts
+++
b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts
@@ -29,7 +29,7 @@ import {
} from "../../types/execute-workflow.interface";
import { WorkflowWebsocketService } from
"../workflow-websocket/workflow-websocket.service";
import { PaginatedResultEvent, WorkflowAvailableResultEvent } from
"../../types/workflow-websocket.interface";
-import { BehaviorSubject, map, Observable, of, pairwise, ReplaySubject,
Subject } from "rxjs";
+import { map, Observable, of, pairwise, ReplaySubject, Subject } from "rxjs";
import { v4 as uuid } from "uuid";
import { IndexableObject } from "../../types/result-table.interface";
import { isDefined } from "../../../common/util/predicate";
@@ -49,13 +49,11 @@ export class WorkflowResultService {
private resultUpdateStream = new Subject<Record<string, WebResultUpdate |
undefined>>();
private resultTableStats = new ReplaySubject<Record<string, Record<string,
Record<string, number>>>>(1);
private resultInitiateStream = new Subject<string>();
- private sinkStorageModeSubject = new BehaviorSubject<string>("");
constructor(private wsService: WorkflowWebsocketService) {
this.wsService.subscribeToEvent("WebResultUpdateEvent").subscribe(event =>
{
this.handleResultUpdate(event.updates);
this.handleTableStatsUpdate(event.tableStats);
- this.handleSinkStorageModeUpdate(event.sinkStorageMode);
});
this.wsService
.subscribeToEvent("WorkflowAvailableResultEvent")
@@ -165,14 +163,6 @@ export class WorkflowResultService {
this.resultTableStats.next(event);
}
- private handleSinkStorageModeUpdate(sinkStorageMode: string): void {
- this.sinkStorageModeSubject.next(sinkStorageMode);
- }
-
- public getSinkStorageMode(): BehaviorSubject<string> {
- return this.sinkStorageModeSubject;
- }
-
private getOrInitPaginatedResultService(operatorID: string):
OperatorPaginationResultService {
let service = this.getPaginatedResultService(operatorID);
if (!service) {
diff --git a/frontend/src/app/workspace/types/execute-workflow.interface.ts
b/frontend/src/app/workspace/types/execute-workflow.interface.ts
index 23ade23199..1633e4bfdf 100644
--- a/frontend/src/app/workspace/types/execute-workflow.interface.ts
+++ b/frontend/src/app/workspace/types/execute-workflow.interface.ts
@@ -120,7 +120,6 @@ export interface WorkflowResultUpdateEvent
extends Readonly<{
updates: WorkflowResultUpdate;
tableStats: WorkflowResultTableStats;
- sinkStorageMode: string;
}> {}
// user-defined type guards to check the type of the result update