This is an automated email from the ASF dual-hosted git repository.

chenli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 253409a6ba refactor(dataset): Redirect multipart upload through File 
Service (#4136)
253409a6ba is described below

commit 253409a6ba8c07f573aa0dede1ea747ad6b2c97e
Author: carloea2 <[email protected]>
AuthorDate: Mon Jan 5 09:11:15 2026 -0800

    refactor(dataset): Redirect multipart upload through File Service (#4136)
    
    ### What changes were proposed in this PR?
    
    * **DB / schema**
    
    * Add `dataset_upload_session` to track multipart upload sessions,
    including:
    
        * `(uid, did, file_path)` as the primary key
        * `upload_id` (**UNIQUE**), `physical_address`
        * **`num_parts_requested`** to enforce expected part count
    
    * Add `dataset_upload_session_part` to track per-part completion for a
    multipart upload:
    
        * `(upload_id, part_number)` as the primary key
    * `etag` (`TEXT NOT NULL DEFAULT ''`) to persist per-part ETags for
    finalize
        * `CHECK (part_number > 0)` for sanity
    * `FOREIGN KEY (upload_id) REFERENCES dataset_upload_session(upload_id)
    ON DELETE CASCADE`
    
    * **Backend (`DatasetResource`)**
    
    * Multipart upload API (server-side streaming to S3, LakeFS manages
    multipart state):
    
        * `POST /dataset/multipart-upload?type=init`
    
          * Validates permissions and input.
          * Creates a LakeFS multipart upload session.
          * Inserts a DB session row including `num_parts_requested`.
    * **Pre-creates placeholder rows** in `dataset_upload_session_part` for
    part numbers `1..num_parts_requested` with `etag = ''` (enables
    deterministic per-part locking and simple completeness checks).
    * **Rejects init if a session already exists** for `(uid, did,
    file_path)` (409 Conflict). Race is handled via PK/duplicate handling +
    best-effort LakeFS abort for the losing initializer.
    
        * `POST /dataset/multipart-upload/part?filePath=...&partNumber=...`
    
          * Requires dataset write access and an existing upload session.
          * **Requires `Content-Length`** for streaming uploads.
          * Enforces `partNumber <= num_parts_requested`.
    * **Per-part locking**: locks the `(upload_id, part_number)` row using
    `SELECT … FOR UPDATE NOWAIT` to prevent concurrent uploads of the same
    part.
    * Uploads the part to S3 and **persists the returned ETag** into
    `dataset_upload_session_part.etag` (upsert/overwrite for retries).
    * Implements idempotency for retries by returning success if the ETag is
    already present for that part.
    
        * `POST /dataset/multipart-upload?type=finish`
    
    * Locks the session row using `SELECT … FOR UPDATE NOWAIT` to prevent
    concurrent finalize/abort.
    
          * Validates completeness using DB state:
    
    * Confirms the part table has `num_parts_requested` rows for the
    `upload_id`.
    * Confirms **all parts have non-empty ETags** (no missing parts).
    * Optionally surfaces a bounded list of missing part numbers (without
    relying on error-message asserts in tests).
    
    * Fetches `(part_number, etag)` ordered by `part_number` from DB and
    completes multipart upload via LakeFS.
    
    * Deletes the DB session row; part rows are cleaned up via `ON DELETE
    CASCADE`.
    
    * **NOWAIT lock contention is handled** (mapped to “already being
    finalized/aborted”, 409).
    
        * `POST /dataset/multipart-upload?type=abort`
    
          * Locks the session row using `SELECT … FOR UPDATE NOWAIT`.
    * Aborts the multipart upload via LakeFS and deletes the DB session row
    (parts cascade-delete).
          * **NOWAIT lock contention is handled** similarly to `finish`.
    
    * Access control and dataset permissions remain enforced on all
    endpoints.
    
    * **Frontend service (`dataset.service.ts`)**
    
    * `multipartUpload(...)` updated to reflect the server flow and return
    values (ETag persistence is server-side; frontend does not need to track
    ETags).
    
    * **Frontend component (`dataset-detail.component.ts`)**
    
      * Uses the same init/part/finish flow.
      * Abort triggers backend `type=abort` to clean up the upload session.
    
    ---
    
    ### Any related issues, documentation, discussions?
    
    Closes #4110
    
    ---
    
    ### How was this PR tested?
    
    * **Unit tests added/updated** (multipart upload spec):
    
    * Init validation (invalid numParts, invalid filePath, permission
    denied).
    * Upload part validation (missing/invalid Content-Length, partNumber
    bounds, minimum size enforcement for non-final parts).
    * **Per-part lock behavior** under contention (no concurrent streams for
    the same part; deterministic assertions).
      * Finish/abort locking behavior (NOWAIT contention returns 409).
    * Successful end-to-end path (init → upload parts → finish) with DB
    cleanup assertions.
    * **Integrity checks**: positive + negative SHA-256 tests by downloading
    the finalized object and verifying it matches (or does not match) the
    expected concatenated bytes.
    
    * Manual testing via the dataset detail page (single and multiple
    uploads), verified:
    
      * Progress, speed, and ETA updates.
      * Abort behavior (UI state + DB session cleanup).
    * Successful completion path (all expected parts uploaded, LakeFS object
    present, dataset version creation works).
    
    ---
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    GPT partial use.
    
    ---------
    
    Co-authored-by: Chen Li <[email protected]>
---
 .../core/storage/util/LakeFSStorageClient.scala    |   44 +
 .../texera/service/util/S3StorageClient.scala      |   55 +
 .../texera/service/resource/DatasetResource.scala  |  652 +++++++++--
 .../org/apache/texera/service/MockLakeFS.scala     |   56 +-
 .../service/resource/DatasetResourceSpec.scala     | 1185 +++++++++++++++++++-
 .../dataset-detail.component.ts                    |  231 ++--
 .../service/user/dataset/dataset.service.ts        |  276 ++---
 sql/texera_ddl.sql                                 |   33 +
 sql/updates/17.sql                                 |   66 ++
 9 files changed, 2220 insertions(+), 378 deletions(-)

diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala
index 63c09f4c30..d01e820259 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala
@@ -25,6 +25,7 @@ import io.lakefs.clients.sdk.model._
 import org.apache.texera.amber.config.StorageConfig
 
 import java.io.{File, FileOutputStream, InputStream}
+import java.net.URI
 import java.nio.file.Files
 import scala.jdk.CollectionConverters._
 
@@ -358,4 +359,47 @@ object LakeFSStorageClient {
 
     branchesApi.resetBranch(repoName, branchName, resetCreation).execute()
   }
+
+  /**
+    * Parse a physical address URI of the form "<scheme>://<bucket>/<key...>" 
into (bucket, key).
+    *
+    * Expected examples:
+    *   - "s3://my-bucket/path/to/file.csv"
+    *   - "gs://my-bucket/some/prefix/data.json"
+    *
+    * @param address URI string in the form "<scheme>://<bucket>/<key...>"
+    * @return (bucket, key) where key does not start with "/"
+    * @throws IllegalArgumentException
+    *   if the address is empty, not a valid URI, missing bucket/host, or 
missing key/path
+    */
+  def parsePhysicalAddress(address: String): (String, String) = {
+    val raw = Option(address).getOrElse("").trim
+    if (raw.isEmpty)
+      throw new IllegalArgumentException("Address is empty (expected 
'<scheme>://<bucket>/<key>')")
+
+    val uri =
+      try new URI(raw)
+      catch {
+        case e: Exception =>
+          throw new IllegalArgumentException(
+            s"Invalid address URI: '$raw' (expected 
'<scheme>://<bucket>/<key>')",
+            e
+          )
+      }
+
+    val bucket = Option(uri.getHost).getOrElse("").trim
+    if (bucket.isEmpty)
+      throw new IllegalArgumentException(
+        s"Invalid address: missing host/bucket in '$raw' (expected 
'<scheme>://<bucket>/<key>')"
+      )
+
+    val key = Option(uri.getPath).getOrElse("").stripPrefix("/").trim
+    if (key.isEmpty)
+      throw new IllegalArgumentException(
+        s"Invalid address: missing key/path in '$raw' (expected 
'<scheme>://<bucket>/<key>')"
+      )
+
+    (bucket, key)
+  }
+
 }
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
index 94007e988e..b7a66a1bc8 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
@@ -259,4 +259,59 @@ object S3StorageClient {
       DeleteObjectRequest.builder().bucket(bucketName).key(objectKey).build()
     )
   }
+
+  /**
+    * Uploads a single part for an in-progress S3 multipart upload.
+    *
+    * This method wraps the AWS SDK v2 {@code UploadPart} API:
+    * it builds an {@link 
software.amazon.awssdk.services.s3.model.UploadPartRequest}
+    * and streams the part payload via a {@link 
software.amazon.awssdk.core.sync.RequestBody}.
+    *
+    * Payload handling:
+    *   - If {@code contentLength} is provided, the payload is streamed 
directly from {@code inputStream}
+    *     using {@code RequestBody.fromInputStream(inputStream, len)}.
+    *   - If {@code contentLength} is {@code None}, the entire {@code 
inputStream} is read into memory
+    *     ({@code readAllBytes}) and uploaded using {@code 
RequestBody.fromBytes(bytes)}.
+    *     This is convenient but can be memory-expensive for large parts; 
prefer providing a known length.
+    *
+    * Notes:
+    *   - {@code partNumber} must be in the valid S3 range (typically 
1..10,000).
+    *   - The caller is responsible for closing {@code inputStream}.
+    *   - This method is synchronous and will block the calling thread until 
the upload completes.
+    *
+    * @param bucket        S3 bucket name.
+    * @param key           Object key (path) being uploaded.
+    * @param uploadId      Multipart upload identifier returned by 
CreateMultipartUpload.
+    * @param partNumber    1-based part number for this upload.
+    * @param inputStream   Stream containing the bytes for this part.
+    * @param contentLength Optional size (in bytes) of this part; provide it 
to avoid buffering in memory.
+    * @return              The {@link 
software.amazon.awssdk.services.s3.model.UploadPartResponse},
+    *                      including the part ETag used for completing the 
multipart upload.
+    */
+  def uploadPartWithRequest(
+      bucket: String,
+      key: String,
+      uploadId: String,
+      partNumber: Int,
+      inputStream: InputStream,
+      contentLength: Option[Long]
+  ): UploadPartResponse = {
+    val requestBody: RequestBody = contentLength match {
+      case Some(len) => RequestBody.fromInputStream(inputStream, len)
+      case None =>
+        val bytes = inputStream.readAllBytes()
+        RequestBody.fromBytes(bytes)
+    }
+
+    val req = UploadPartRequest
+      .builder()
+      .bucket(bucket)
+      .key(key)
+      .uploadId(uploadId)
+      .partNumber(partNumber)
+      .build()
+
+    s3Client.uploadPart(req, requestBody)
+  }
+
 }
diff --git 
a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala
 
b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala
index 023c4ffc88..44ce22dfb1 100644
--- 
a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala
+++ 
b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala
@@ -54,7 +54,8 @@ import org.apache.texera.service.util.S3StorageClient.{
   MINIMUM_NUM_OF_MULTIPART_S3_PART
 }
 import org.jooq.{DSLContext, EnumType}
-
+import org.jooq.impl.DSL
+import org.jooq.impl.DSL.{inline => inl}
 import java.io.{InputStream, OutputStream}
 import java.net.{HttpURLConnection, URL, URLDecoder}
 import java.nio.charset.StandardCharsets
@@ -65,6 +66,13 @@ import java.util.zip.{ZipEntry, ZipOutputStream}
 import scala.collection.mutable.ListBuffer
 import scala.jdk.CollectionConverters._
 import scala.jdk.OptionConverters._
+import 
org.apache.texera.dao.jooq.generated.tables.DatasetUploadSession.DATASET_UPLOAD_SESSION
+import 
org.apache.texera.dao.jooq.generated.tables.DatasetUploadSessionPart.DATASET_UPLOAD_SESSION_PART
+import org.jooq.exception.DataAccessException
+import software.amazon.awssdk.services.s3.model.UploadPartResponse
+
+import java.sql.SQLException
+import scala.util.Try
 
 object DatasetResource {
 
@@ -89,11 +97,11 @@ object DatasetResource {
     */
   private def put(buf: Array[Byte], len: Int, url: String, partNum: Int): 
String = {
     val conn = new URL(url).openConnection().asInstanceOf[HttpURLConnection]
-    conn.setDoOutput(true);
+    conn.setDoOutput(true)
     conn.setRequestMethod("PUT")
     conn.setFixedLengthStreamingMode(len)
     val out = conn.getOutputStream
-    out.write(buf, 0, len);
+    out.write(buf, 0, len)
     out.close()
 
     val code = conn.getResponseCode
@@ -401,7 +409,6 @@ class DatasetResource {
             e
           )
       }
-
       // delete the directory on S3
       if (
         S3StorageClient.directoryExists(StorageConfig.lakefsBucketName, 
dataset.getRepositoryName)
@@ -639,138 +646,173 @@ class DatasetResource {
       @QueryParam("type") operationType: String,
       @QueryParam("ownerEmail") ownerEmail: String,
       @QueryParam("datasetName") datasetName: String,
-      @QueryParam("filePath") encodedUrl: String,
-      @QueryParam("uploadId") uploadId: Optional[String],
+      @QueryParam("filePath") filePath: String,
       @QueryParam("numParts") numParts: Optional[Integer],
-      payload: Map[
-        String,
-        Any
-      ], // Expecting {"parts": [...], "physicalAddress": "s3://bucket/path"}
       @Auth user: SessionUser
   ): Response = {
     val uid = user.getUid
+    val dataset: Dataset = getDatasetBy(ownerEmail, datasetName)
+
+    operationType.toLowerCase match {
+      case "init"   => initMultipartUpload(dataset.getDid, filePath, numParts, 
uid)
+      case "finish" => finishMultipartUpload(dataset.getDid, filePath, uid)
+      case "abort"  => abortMultipartUpload(dataset.getDid, filePath, uid)
+      case _ =>
+        throw new BadRequestException("Invalid type parameter. Use 'init', 
'finish', or 'abort'.")
+    }
+  }
 
-    withTransaction(context) { ctx =>
-      val dataset = context
-        .select(DATASET.fields: _*)
-        .from(DATASET)
-        .leftJoin(USER)
-        .on(USER.UID.eq(DATASET.OWNER_UID))
-        .where(USER.EMAIL.eq(ownerEmail))
-        .and(DATASET.NAME.eq(datasetName))
-        .fetchOneInto(classOf[Dataset])
-      if (dataset == null || !userHasWriteAccess(ctx, dataset.getDid, uid)) {
-        throw new ForbiddenException(ERR_USER_HAS_NO_ACCESS_TO_DATASET_MESSAGE)
-      }
+  @POST
+  @RolesAllowed(Array("REGULAR", "ADMIN"))
+  @Consumes(Array(MediaType.APPLICATION_OCTET_STREAM))
+  @Path("/multipart-upload/part")
+  def uploadPart(
+      @QueryParam("ownerEmail") datasetOwnerEmail: String,
+      @QueryParam("datasetName") datasetName: String,
+      @QueryParam("filePath") encodedFilePath: String,
+      @QueryParam("partNumber") partNumber: Int,
+      partStream: InputStream,
+      @Context headers: HttpHeaders,
+      @Auth user: SessionUser
+  ): Response = {
 
-      // Decode the file path
-      val repositoryName = dataset.getRepositoryName
-      val filePath = URLDecoder.decode(encodedUrl, 
StandardCharsets.UTF_8.name())
+    val uid = user.getUid
+    val dataset: Dataset = getDatasetBy(datasetOwnerEmail, datasetName)
+    val did = dataset.getDid
 
-      operationType.toLowerCase match {
-        case "init" =>
-          val numPartsValue = numParts.toScala.getOrElse(
-            throw new BadRequestException("numParts is required for 
initialization")
-          )
+    if (encodedFilePath == null || encodedFilePath.isEmpty)
+      throw new BadRequestException("filePath is required")
+    if (partNumber < 1)
+      throw new BadRequestException("partNumber must be >= 1")
 
-          val presignedResponse = 
LakeFSStorageClient.initiatePresignedMultipartUploads(
-            repositoryName,
-            filePath,
-            numPartsValue
-          )
-          Response
-            .ok(
-              Map(
-                "uploadId" -> presignedResponse.getUploadId,
-                "presignedUrls" -> presignedResponse.getPresignedUrls,
-                "physicalAddress" -> presignedResponse.getPhysicalAddress
-              )
-            )
-            .build()
+    val filePath = validateAndNormalizeFilePathOrThrow(
+      URLDecoder.decode(encodedFilePath, StandardCharsets.UTF_8.name())
+    )
 
-        case "finish" =>
-          val uploadIdValue = uploadId.toScala.getOrElse(
-            throw new BadRequestException("uploadId is required for 
completion")
-          )
+    val contentLength =
+      Option(headers.getHeaderString(HttpHeaders.CONTENT_LENGTH))
+        .map(_.trim)
+        .flatMap(s => Try(s.toLong).toOption)
+        .filter(_ > 0)
+        .getOrElse {
+          throw new BadRequestException("Invalid/Missing Content-Length")
+        }
 
-          // Extract parts from the payload
-          val partsList = payload.get("parts") match {
-            case Some(rawList: List[_]) =>
-              try {
-                rawList.map {
-                  case part: Map[_, _] =>
-                    val partMap = part.asInstanceOf[Map[String, Any]]
-                    val partNumber = partMap.get("PartNumber") match {
-                      case Some(i: Int)    => i
-                      case Some(s: String) => s.toInt
-                      case _               => throw new 
BadRequestException("Invalid or missing PartNumber")
-                    }
-                    val eTag = partMap.get("ETag") match {
-                      case Some(s: String) => s
-                      case _               => throw new 
BadRequestException("Invalid or missing ETag")
-                    }
-                    (partNumber, eTag)
-
-                  case _ =>
-                    throw new BadRequestException("Each part must be a 
Map[String, Any]")
-                }
-              } catch {
-                case e: NumberFormatException =>
-                  throw new BadRequestException("PartNumber must be an 
integer", e)
-              }
-
-            case _ =>
-              throw new BadRequestException("Missing or invalid 'parts' list 
in payload")
-          }
+    withTransaction(context) { ctx =>
+      if (!userHasWriteAccess(ctx, did, uid))
+        throw new ForbiddenException(ERR_USER_HAS_NO_ACCESS_TO_DATASET_MESSAGE)
 
-          // Extract physical address from payload
-          val physicalAddress = payload.get("physicalAddress") match {
-            case Some(address: String) => address
-            case _                     => throw new 
BadRequestException("Missing physicalAddress in payload")
-          }
+      val session = ctx
+        .selectFrom(DATASET_UPLOAD_SESSION)
+        .where(
+          DATASET_UPLOAD_SESSION.UID
+            .eq(uid)
+            .and(DATASET_UPLOAD_SESSION.DID.eq(did))
+            .and(DATASET_UPLOAD_SESSION.FILE_PATH.eq(filePath))
+        )
+        .fetchOne()
 
-          // Complete the multipart upload with parts and physical address
-          val objectStats = 
LakeFSStorageClient.completePresignedMultipartUploads(
-            repositoryName,
-            filePath,
-            uploadIdValue,
-            partsList,
-            physicalAddress
-          )
+      if (session == null)
+        throw new NotFoundException("Upload session not found. Call type=init 
first.")
 
-          Response
-            .ok(
-              Map(
-                "message" -> "Multipart upload completed successfully",
-                "filePath" -> objectStats.getPath
-              )
+      val expectedParts = session.getNumPartsRequested
+      if (partNumber > expectedParts) {
+        throw new BadRequestException(
+          s"$partNumber exceeds the requested parts on init: $expectedParts"
+        )
+      }
+
+      if (partNumber < expectedParts && contentLength < 
MINIMUM_NUM_OF_MULTIPART_S3_PART) {
+        throw new BadRequestException(
+          s"Part $partNumber is too small ($contentLength bytes). " +
+            s"All non-final parts must be >= $MINIMUM_NUM_OF_MULTIPART_S3_PART 
bytes."
+        )
+      }
+
+      val physicalAddr = 
Option(session.getPhysicalAddress).map(_.trim).getOrElse("")
+      if (physicalAddr.isEmpty) {
+        throw new WebApplicationException(
+          "Upload session is missing physicalAddress. Re-init the upload.",
+          Response.Status.INTERNAL_SERVER_ERROR
+        )
+      }
+
+      val uploadId = session.getUploadId
+      val (bucket, key) =
+        try LakeFSStorageClient.parsePhysicalAddress(physicalAddr)
+        catch {
+          case e: IllegalArgumentException =>
+            throw new WebApplicationException(
+              s"Upload session has invalid physicalAddress. Re-init the 
upload. (${e.getMessage})",
+              Response.Status.INTERNAL_SERVER_ERROR
             )
-            .build()
+        }
 
-        case "abort" =>
-          val uploadIdValue = uploadId.toScala.getOrElse(
-            throw new BadRequestException("uploadId is required for abortion")
-          )
+      // Per-part lock: if another request is streaming the same part, fail 
fast.
+      val partRow =
+        try {
+          ctx
+            .selectFrom(DATASET_UPLOAD_SESSION_PART)
+            .where(
+              DATASET_UPLOAD_SESSION_PART.UPLOAD_ID
+                .eq(uploadId)
+                .and(DATASET_UPLOAD_SESSION_PART.PART_NUMBER.eq(partNumber))
+            )
+            .forUpdate()
+            .noWait()
+            .fetchOne()
+        } catch {
+          case e: DataAccessException
+              if Option(e.getCause)
+                .collect { case s: SQLException => s.getSQLState }
+                .contains("55P03") =>
+            throw new WebApplicationException(
+              s"Part $partNumber is already being uploaded",
+              Response.Status.CONFLICT
+            )
+        }
 
-          // Extract physical address from payload
-          val physicalAddress = payload.get("physicalAddress") match {
-            case Some(address: String) => address
-            case _                     => throw new 
BadRequestException("Missing physicalAddress in payload")
-          }
+      if (partRow == null) {
+        // Should not happen if init pre-created rows
+        throw new WebApplicationException(
+          s"Part row not initialized for part $partNumber. Re-init the 
upload.",
+          Response.Status.INTERNAL_SERVER_ERROR
+        )
+      }
 
-          // Abort the multipart upload
-          LakeFSStorageClient.abortPresignedMultipartUploads(
-            repositoryName,
-            filePath,
-            uploadIdValue,
-            physicalAddress
+      // Idempotency: if ETag already set, accept the retry quickly.
+      val existing = Option(partRow.getEtag).map(_.trim).getOrElse("")
+      if (existing.isEmpty) {
+        // Stream to S3 while holding the part lock (prevents concurrent 
streams for same part)
+        val response: UploadPartResponse =
+          S3StorageClient.uploadPartWithRequest(
+            bucket = bucket,
+            key = key,
+            uploadId = uploadId,
+            partNumber = partNumber,
+            inputStream = partStream,
+            contentLength = Some(contentLength)
           )
 
-          Response.ok(Map("message" -> "Multipart upload aborted 
successfully")).build()
+        val etagClean = Option(response.eTag()).map(_.replace("\"", 
"")).map(_.trim).getOrElse("")
+        if (etagClean.isEmpty) {
+          throw new WebApplicationException(
+            s"Missing ETag returned from S3 for part $partNumber",
+            Response.Status.INTERNAL_SERVER_ERROR
+          )
+        }
 
-        case _ =>
-          throw new BadRequestException("Invalid type parameter. Use 'init', 
'finish', or 'abort'.")
+        ctx
+          .update(DATASET_UPLOAD_SESSION_PART)
+          .set(DATASET_UPLOAD_SESSION_PART.ETAG, etagClean)
+          .where(
+            DATASET_UPLOAD_SESSION_PART.UPLOAD_ID
+              .eq(uploadId)
+              .and(DATASET_UPLOAD_SESSION_PART.PART_NUMBER.eq(partNumber))
+          )
+          .execute()
       }
+      Response.ok().build()
     }
   }
 
@@ -1014,9 +1056,8 @@ class DatasetResource {
       val ownerNode = DatasetFileNode
         .fromLakeFSRepositoryCommittedObjects(
           Map(
-            (user.getEmail, dataset.getName, latestVersion.getName) ->
-              LakeFSStorageClient
-                .retrieveObjectsOfVersion(dataset.getRepositoryName, 
latestVersion.getVersionHash)
+            (user.getEmail, dataset.getName, latestVersion.getName) -> 
LakeFSStorageClient
+              .retrieveObjectsOfVersion(dataset.getRepositoryName, 
latestVersion.getVersionHash)
           )
         )
         .head
@@ -1326,4 +1367,379 @@ class DatasetResource {
         Right(response)
     }
   }
+
+  // === Multipart helpers ===
+
+  private def getDatasetBy(ownerEmail: String, datasetName: String) = {
+    val dataset = context
+      .select(DATASET.fields: _*)
+      .from(DATASET)
+      .leftJoin(USER)
+      .on(USER.UID.eq(DATASET.OWNER_UID))
+      .where(USER.EMAIL.eq(ownerEmail))
+      .and(DATASET.NAME.eq(datasetName))
+      .fetchOneInto(classOf[Dataset])
+    if (dataset == null) {
+      throw new BadRequestException("Dataset not found")
+    }
+    dataset
+  }
+
+  private def validateAndNormalizeFilePathOrThrow(filePath: String): String = {
+    val path = Option(filePath).getOrElse("").replace("\\", "/")
+    if (
+      path.isEmpty ||
+      path.startsWith("/") ||
+      path.split("/").exists(seg => seg == "." || seg == "..") ||
+      path.exists(ch => ch == 0.toChar || ch < 0x20.toChar || ch == 
0x7f.toChar)
+    ) throw new BadRequestException("Invalid filePath")
+    path
+  }
+
+  private def initMultipartUpload(
+      did: Integer,
+      encodedFilePath: String,
+      numParts: Optional[Integer],
+      uid: Integer
+  ): Response = {
+
+    withTransaction(context) { ctx =>
+      if (!userHasWriteAccess(ctx, did, uid)) {
+        throw new ForbiddenException(ERR_USER_HAS_NO_ACCESS_TO_DATASET_MESSAGE)
+      }
+
+      val dataset = getDatasetByID(ctx, did)
+      val repositoryName = dataset.getRepositoryName
+
+      val filePath =
+        validateAndNormalizeFilePathOrThrow(
+          URLDecoder.decode(encodedFilePath, StandardCharsets.UTF_8.name())
+        )
+
+      val numPartsValue = numParts.toScala.getOrElse {
+        throw new BadRequestException("numParts is required for 
initialization")
+      }
+      if (numPartsValue < 1 || numPartsValue > 
MAXIMUM_NUM_OF_MULTIPART_S3_PARTS) {
+        throw new BadRequestException(
+          "numParts must be between 1 and " + MAXIMUM_NUM_OF_MULTIPART_S3_PARTS
+        )
+      }
+
+      // Reject if a session already exists
+      val exists = ctx.fetchExists(
+        ctx
+          .selectOne()
+          .from(DATASET_UPLOAD_SESSION)
+          .where(
+            DATASET_UPLOAD_SESSION.UID
+              .eq(uid)
+              .and(DATASET_UPLOAD_SESSION.DID.eq(did))
+              .and(DATASET_UPLOAD_SESSION.FILE_PATH.eq(filePath))
+          )
+      )
+      if (exists) {
+        throw new WebApplicationException(
+          "Upload already in progress for this filePath",
+          Response.Status.CONFLICT
+        )
+      }
+
+      val presign = LakeFSStorageClient.initiatePresignedMultipartUploads(
+        repositoryName,
+        filePath,
+        numPartsValue
+      )
+
+      val uploadIdStr = presign.getUploadId
+      val physicalAddr = presign.getPhysicalAddress
+
+      // If anything fails after this point, abort LakeFS multipart
+      try {
+        val rowsInserted = ctx
+          .insertInto(DATASET_UPLOAD_SESSION)
+          .set(DATASET_UPLOAD_SESSION.FILE_PATH, filePath)
+          .set(DATASET_UPLOAD_SESSION.DID, did)
+          .set(DATASET_UPLOAD_SESSION.UID, uid)
+          .set(DATASET_UPLOAD_SESSION.UPLOAD_ID, uploadIdStr)
+          .set(DATASET_UPLOAD_SESSION.PHYSICAL_ADDRESS, physicalAddr)
+          .set(DATASET_UPLOAD_SESSION.NUM_PARTS_REQUESTED, numPartsValue)
+          .onDuplicateKeyIgnore()
+          .execute()
+
+        if (rowsInserted != 1) {
+          LakeFSStorageClient.abortPresignedMultipartUploads(
+            repositoryName,
+            filePath,
+            uploadIdStr,
+            physicalAddr
+          )
+          throw new WebApplicationException(
+            "Upload already in progress for this filePath",
+            Response.Status.CONFLICT
+          )
+        }
+
+        // Pre-create part rows 1..numPartsValue with empty ETag.
+        // This makes per-part locking cheap and deterministic.
+
+        val partNumberSeries = DSL.generateSeries(1, 
numPartsValue).asTable("gs", "pn")
+        val partNumberField = partNumberSeries.field("pn", classOf[Integer])
+
+        ctx
+          .insertInto(
+            DATASET_UPLOAD_SESSION_PART,
+            DATASET_UPLOAD_SESSION_PART.UPLOAD_ID,
+            DATASET_UPLOAD_SESSION_PART.PART_NUMBER,
+            DATASET_UPLOAD_SESSION_PART.ETAG
+          )
+          .select(
+            ctx
+              .select(
+                inl(uploadIdStr),
+                partNumberField,
+                inl("") // placeholder empty etag
+              )
+              .from(partNumberSeries)
+          )
+          .execute()
+
+        Response.ok().build()
+      } catch {
+        case e: Exception =>
+          // rollback will remove session + parts rows; we still must abort 
LakeFS
+          try {
+            LakeFSStorageClient.abortPresignedMultipartUploads(
+              repositoryName,
+              filePath,
+              uploadIdStr,
+              physicalAddr
+            )
+          } catch { case _: Throwable => () }
+          throw e
+      }
+    }
+  }
+
+  private def finishMultipartUpload(
+      did: Integer,
+      encodedFilePath: String,
+      uid: Int
+  ): Response = {
+
+    val filePath = validateAndNormalizeFilePathOrThrow(
+      URLDecoder.decode(encodedFilePath, StandardCharsets.UTF_8.name())
+    )
+
+    withTransaction(context) { ctx =>
+      if (!userHasWriteAccess(ctx, did, uid)) {
+        throw new ForbiddenException(ERR_USER_HAS_NO_ACCESS_TO_DATASET_MESSAGE)
+      }
+
+      val dataset = getDatasetByID(ctx, did)
+
+      // Lock the session so abort/finish don't race each other
+      val session =
+        try {
+          ctx
+            .selectFrom(DATASET_UPLOAD_SESSION)
+            .where(
+              DATASET_UPLOAD_SESSION.UID
+                .eq(uid)
+                .and(DATASET_UPLOAD_SESSION.DID.eq(did))
+                .and(DATASET_UPLOAD_SESSION.FILE_PATH.eq(filePath))
+            )
+            .forUpdate()
+            .noWait()
+            .fetchOne()
+        } catch {
+          case e: DataAccessException
+              if Option(e.getCause)
+                .collect { case s: SQLException => s.getSQLState }
+                .contains("55P03") =>
+            throw new WebApplicationException(
+              "Upload is already being finalized/aborted",
+              Response.Status.CONFLICT
+            )
+        }
+
+      if (session == null) {
+        throw new NotFoundException("Upload session not found or already 
finalized")
+      }
+
+      val uploadId = session.getUploadId
+      val expectedParts = session.getNumPartsRequested
+
+      val physicalAddr = 
Option(session.getPhysicalAddress).map(_.trim).getOrElse("")
+      if (physicalAddr.isEmpty) {
+        throw new WebApplicationException(
+          "Upload session is missing physicalAddress. Re-init the upload.",
+          Response.Status.INTERNAL_SERVER_ERROR
+        )
+      }
+
+      val total = DSL.count()
+      val done =
+        DSL
+          .count()
+          .filterWhere(DATASET_UPLOAD_SESSION_PART.ETAG.ne(""))
+          .as("done")
+
+      val agg = ctx
+        .select(total.as("total"), done)
+        .from(DATASET_UPLOAD_SESSION_PART)
+        .where(DATASET_UPLOAD_SESSION_PART.UPLOAD_ID.eq(uploadId))
+        .fetchOne()
+
+      val totalCnt = agg.get("total", classOf[java.lang.Integer]).intValue()
+      val doneCnt = agg.get("done", classOf[java.lang.Integer]).intValue()
+
+      if (totalCnt != expectedParts) {
+        throw new WebApplicationException(
+          s"Part table mismatch: expected $expectedParts rows but found 
$totalCnt. Re-init the upload.",
+          Response.Status.INTERNAL_SERVER_ERROR
+        )
+      }
+
+      if (doneCnt != expectedParts) {
+        val missing = ctx
+          .select(DATASET_UPLOAD_SESSION_PART.PART_NUMBER)
+          .from(DATASET_UPLOAD_SESSION_PART)
+          .where(
+            DATASET_UPLOAD_SESSION_PART.UPLOAD_ID
+              .eq(uploadId)
+              .and(DATASET_UPLOAD_SESSION_PART.ETAG.eq(""))
+          )
+          .orderBy(DATASET_UPLOAD_SESSION_PART.PART_NUMBER.asc())
+          .limit(50)
+          .fetch(DATASET_UPLOAD_SESSION_PART.PART_NUMBER)
+          .asScala
+          .toList
+
+        throw new WebApplicationException(
+          s"Upload incomplete. Some missing ETags for parts are: 
${missing.mkString(",")}",
+          Response.Status.CONFLICT
+        )
+      }
+
+      // Build partsList in order
+      val partsList: List[(Int, String)] =
+        ctx
+          .select(DATASET_UPLOAD_SESSION_PART.PART_NUMBER, 
DATASET_UPLOAD_SESSION_PART.ETAG)
+          .from(DATASET_UPLOAD_SESSION_PART)
+          .where(DATASET_UPLOAD_SESSION_PART.UPLOAD_ID.eq(uploadId))
+          .orderBy(DATASET_UPLOAD_SESSION_PART.PART_NUMBER.asc())
+          .fetch()
+          .asScala
+          .map(r =>
+            (
+              r.get(DATASET_UPLOAD_SESSION_PART.PART_NUMBER).intValue(),
+              r.get(DATASET_UPLOAD_SESSION_PART.ETAG)
+            )
+          )
+          .toList
+
+      val objectStats = LakeFSStorageClient.completePresignedMultipartUploads(
+        dataset.getRepositoryName,
+        filePath,
+        uploadId,
+        partsList,
+        physicalAddr
+      )
+
+      // Cleanup: delete the session; parts are removed by ON DELETE CASCADE
+      ctx
+        .deleteFrom(DATASET_UPLOAD_SESSION)
+        .where(
+          DATASET_UPLOAD_SESSION.UID
+            .eq(uid)
+            .and(DATASET_UPLOAD_SESSION.DID.eq(did))
+            .and(DATASET_UPLOAD_SESSION.FILE_PATH.eq(filePath))
+        )
+        .execute()
+
+      Response
+        .ok(
+          Map(
+            "message" -> "Multipart upload completed successfully",
+            "filePath" -> objectStats.getPath
+          )
+        )
+        .build()
+    }
+  }
+
+  private def abortMultipartUpload(
+      did: Integer,
+      encodedFilePath: String,
+      uid: Int
+  ): Response = {
+
+    val filePath = validateAndNormalizeFilePathOrThrow(
+      URLDecoder.decode(encodedFilePath, StandardCharsets.UTF_8.name())
+    )
+
+    withTransaction(context) { ctx =>
+      if (!userHasWriteAccess(ctx, did, uid)) {
+        throw new ForbiddenException(ERR_USER_HAS_NO_ACCESS_TO_DATASET_MESSAGE)
+      }
+
+      val dataset = getDatasetByID(ctx, did)
+
+      val session =
+        try {
+          ctx
+            .selectFrom(DATASET_UPLOAD_SESSION)
+            .where(
+              DATASET_UPLOAD_SESSION.UID
+                .eq(uid)
+                .and(DATASET_UPLOAD_SESSION.DID.eq(did))
+                .and(DATASET_UPLOAD_SESSION.FILE_PATH.eq(filePath))
+            )
+            .forUpdate()
+            .noWait()
+            .fetchOne()
+        } catch {
+          case e: DataAccessException
+              if Option(e.getCause)
+                .collect { case s: SQLException => s.getSQLState }
+                .contains("55P03") =>
+            throw new WebApplicationException(
+              "Upload is already being finalized/aborted",
+              Response.Status.CONFLICT
+            )
+        }
+
+      if (session == null) {
+        throw new NotFoundException("Upload session not found or already 
finalized")
+      }
+
+      val physicalAddr = 
Option(session.getPhysicalAddress).map(_.trim).getOrElse("")
+      if (physicalAddr.isEmpty) {
+        throw new WebApplicationException(
+          "Upload session is missing physicalAddress. Re-init the upload.",
+          Response.Status.INTERNAL_SERVER_ERROR
+        )
+      }
+
+      LakeFSStorageClient.abortPresignedMultipartUploads(
+        dataset.getRepositoryName,
+        filePath,
+        session.getUploadId,
+        physicalAddr
+      )
+
+      // Delete session; parts removed via ON DELETE CASCADE
+      ctx
+        .deleteFrom(DATASET_UPLOAD_SESSION)
+        .where(
+          DATASET_UPLOAD_SESSION.UID
+            .eq(uid)
+            .and(DATASET_UPLOAD_SESSION.DID.eq(did))
+            .and(DATASET_UPLOAD_SESSION.FILE_PATH.eq(filePath))
+        )
+        .execute()
+
+      Response.ok(Map("message" -> "Multipart upload aborted 
successfully")).build()
+    }
+  }
 }
diff --git 
a/file-service/src/test/scala/org/apache/texera/service/MockLakeFS.scala 
b/file-service/src/test/scala/org/apache/texera/service/MockLakeFS.scala
index fd1f0b8c90..10c68bd085 100644
--- a/file-service/src/test/scala/org/apache/texera/service/MockLakeFS.scala
+++ b/file-service/src/test/scala/org/apache/texera/service/MockLakeFS.scala
@@ -20,11 +20,18 @@
 package org.apache.texera.service
 
 import com.dimafeng.testcontainers._
+import io.lakefs.clients.sdk.{ApiClient, RepositoriesApi}
 import org.apache.texera.amber.config.StorageConfig
 import org.apache.texera.service.util.S3StorageClient
 import org.scalatest.{BeforeAndAfterAll, Suite}
 import org.testcontainers.containers.Network
 import org.testcontainers.utility.DockerImageName
+import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, 
StaticCredentialsProvider}
+import software.amazon.awssdk.regions.Region
+import software.amazon.awssdk.services.s3.S3Client
+import software.amazon.awssdk.services.s3.S3Configuration
+
+import java.net.URI
 
 /**
   * Trait to spin up a LakeFS + MinIO + Postgres stack using Testcontainers,
@@ -58,9 +65,14 @@ trait MockLakeFS extends ForAllTestContainer with 
BeforeAndAfterAll { self: Suit
     s"postgresql://${postgres.username}:${postgres.password}" +
       
s"@${postgres.container.getNetworkAliases.get(0)}:5432/${postgres.databaseName}"
 +
       s"?sslmode=disable"
+
   val lakefsUsername = "texera-admin"
+
+  // These are the API credentials created/used during setup.
+  // In lakeFS, the access key + secret key are used as basic-auth 
username/password for the API.
   val lakefsAccessKeyID = "AKIAIOSFOLKFSSAMPLES"
   val lakefsSecretAccessKey = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
+
   val lakefs = GenericContainer(
     dockerImage = "treeverse/lakefs:1.51",
     exposedPorts = Seq(8000),
@@ -87,11 +99,45 @@ trait MockLakeFS extends ForAllTestContainer with 
BeforeAndAfterAll { self: Suit
 
   def lakefsBaseUrl: String = 
s"http://${lakefs.host}:${lakefs.mappedPort(8000)}"
   def minioEndpoint: String = s"http://${minio.host}:${minio.mappedPort(9000)}"
+  def lakefsApiBasePath: String = s"$lakefsBaseUrl/api/v1"
+
+  // ---- Clients (lazy so they initialize after containers are started) ----
+
+  lazy val lakefsApiClient: ApiClient = {
+    val apiClient = new ApiClient()
+    apiClient.setBasePath(lakefsApiBasePath)
+    // basic-auth for lakeFS API uses accessKey as username, secretKey as 
password
+    apiClient.setUsername(lakefsAccessKeyID)
+    apiClient.setPassword(lakefsSecretAccessKey)
+    apiClient
+  }
+
+  lazy val repositoriesApi: RepositoriesApi = new 
RepositoriesApi(lakefsApiClient)
+
+  /**
+    * S3 client instance for testing pointed at MinIO.
+    *
+    * Notes:
+    * - Region can be any value for MinIO, but MUST match what your signing 
expects.
+    *   so we use that.
+    * - Path-style is important: http://host:port/bucket/key
+    */
+  lazy val s3Client: S3Client = {
+    //Temporal credentials for testing purposes only
+    val creds = AwsBasicCredentials.create("texera_minio", "password")
+    S3Client
+      .builder()
+      .endpointOverride(URI.create(StorageConfig.s3Endpoint)) // set in 
afterStart()
+      .region(Region.US_WEST_2) // Required for `.build()`; not important in 
this test config.
+      .credentialsProvider(StaticCredentialsProvider.create(creds))
+      
.serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build())
+      .build()
+  }
 
   override def afterStart(): Unit = {
     super.afterStart()
 
-    // setup LakeFS
+    // setup LakeFS (idempotent-ish, but will fail if it truly cannot run)
     val lakefsSetupResult = lakefs.container.execInContainer(
       "lakefs",
       "setup",
@@ -103,16 +149,14 @@ trait MockLakeFS extends ForAllTestContainer with 
BeforeAndAfterAll { self: Suit
       lakefsSecretAccessKey
     )
     if (lakefsSetupResult.getExitCode != 0) {
-      throw new RuntimeException(
-        s"Failed to setup LakeFS: ${lakefsSetupResult.getStderr}"
-      )
+      throw new RuntimeException(s"Failed to setup LakeFS: 
${lakefsSetupResult.getStderr}")
     }
 
     // replace storage endpoints in StorageConfig
     StorageConfig.s3Endpoint = minioEndpoint
-    StorageConfig.lakefsEndpoint = s"$lakefsBaseUrl/api/v1"
+    StorageConfig.lakefsEndpoint = lakefsApiBasePath
 
-    // create S3 bucket
+    // create S3 bucket used by lakeFS in tests
     S3StorageClient.createBucketIfNotExist(StorageConfig.lakefsBucketName)
   }
 }
diff --git 
a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
 
b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
index f526d9d561..3f72c57486 100644
--- 
a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
+++ 
b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
@@ -19,26 +19,66 @@
 
 package org.apache.texera.service.resource
 
-import jakarta.ws.rs.{BadRequestException, ForbiddenException}
+import ch.qos.logback.classic.{Level, Logger}
+import io.lakefs.clients.sdk.ApiException
+import jakarta.ws.rs._
+import jakarta.ws.rs.core.{Cookie, HttpHeaders, MediaType, MultivaluedHashMap, 
Response}
 import org.apache.texera.amber.core.storage.util.LakeFSStorageClient
 import org.apache.texera.auth.SessionUser
 import org.apache.texera.dao.MockTexeraDB
 import org.apache.texera.dao.jooq.generated.enums.{PrivilegeEnum, UserRoleEnum}
+import 
org.apache.texera.dao.jooq.generated.tables.DatasetUploadSession.DATASET_UPLOAD_SESSION
+import 
org.apache.texera.dao.jooq.generated.tables.DatasetUploadSessionPart.DATASET_UPLOAD_SESSION_PART
 import org.apache.texera.dao.jooq.generated.tables.daos.{DatasetDao, UserDao}
 import org.apache.texera.dao.jooq.generated.tables.pojos.{Dataset, User}
 import org.apache.texera.service.MockLakeFS
-import org.scalatest.BeforeAndAfterAll
+import org.jooq.SQLDialect
+import org.jooq.impl.DSL
+import org.scalatest.tagobjects.Slow
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Tag}
 import org.scalatest.flatspec.AnyFlatSpec
 import org.scalatest.matchers.should.Matchers
+import org.slf4j.LoggerFactory
+
+import java.io.{ByteArrayInputStream, IOException, InputStream}
+import java.net.URLEncoder
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Paths}
+import java.security.MessageDigest
+import java.util.concurrent.CyclicBarrier
+import java.util.{Collections, Date, Locale, Optional}
+import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.jdk.CollectionConverters._
+import scala.util.Random
+
+object StressMultipart extends Tag("org.apache.texera.stress.multipart")
 
 class DatasetResourceSpec
     extends AnyFlatSpec
     with Matchers
     with MockTexeraDB
     with MockLakeFS
-    with BeforeAndAfterAll {
+    with BeforeAndAfterAll
+    with BeforeAndAfterEach {
+
+  // ---------- logging (multipart tests can be noisy) ----------
+  private var savedLevels: Map[String, Level] = Map.empty
+
+  private def setLoggerLevel(loggerName: String, newLevel: Level): Level = {
+    val logger = LoggerFactory.getLogger(loggerName).asInstanceOf[Logger]
+    val prev = logger.getLevel
+    logger.setLevel(newLevel)
+    prev
+  }
 
-  private val testUser: User = {
+  // ---------- execution context (multipart race tests) ----------
+  private implicit val ec: ExecutionContext = ExecutionContext.global
+
+  // 
---------------------------------------------------------------------------
+  // Shared fixtures (DatasetResource basic tests)
+  // 
---------------------------------------------------------------------------
+  private val ownerUser: User = {
     val user = new User
     user.setName("test_user")
     user.setPassword("123")
@@ -47,7 +87,7 @@ class DatasetResourceSpec
     user
   }
 
-  private val testUser2: User = {
+  private val otherAdminUser: User = {
     val user = new User
     user.setName("test_user2")
     user.setPassword("123")
@@ -56,7 +96,17 @@ class DatasetResourceSpec
     user
   }
 
-  private val testDataset: Dataset = {
+  // REGULAR user used specifically for multipart "no WRITE access" tests.
+  private val multipartNoWriteUser: User = {
+    val user = new User
+    user.setName("multipart_user2")
+    user.setPassword("123")
+    user.setEmail("[email protected]")
+    user.setRole(UserRoleEnum.REGULAR)
+    user
+  }
+
+  private val baseDataset: Dataset = {
     val dataset = new Dataset
     dataset.setName("test-dataset")
     dataset.setRepositoryName("test-dataset")
@@ -66,29 +116,84 @@ class DatasetResourceSpec
     dataset
   }
 
-  lazy val datasetDao = new DatasetDao(getDSLContext.configuration())
+  // 
---------------------------------------------------------------------------
+  // Multipart fixtures
+  // 
---------------------------------------------------------------------------
+  private val multipartRepoName: String =
+    
s"multipart-ds-${System.nanoTime()}-${Random.alphanumeric.take(6).mkString.toLowerCase}"
 
+  private val multipartDataset: Dataset = {
+    val dataset = new Dataset
+    dataset.setName("multipart-ds")
+    dataset.setRepositoryName(multipartRepoName)
+    dataset.setIsPublic(true)
+    dataset.setIsDownloadable(true)
+    dataset.setDescription("dataset for multipart upload tests")
+    dataset
+  }
+
+  // ---------- DAOs / resource ----------
+  lazy val datasetDao = new DatasetDao(getDSLContext.configuration())
   lazy val datasetResource = new DatasetResource()
 
-  lazy val sessionUser = new SessionUser(testUser)
-  lazy val sessionUser2 = new SessionUser(testUser2)
+  // ---------- session users ----------
+  lazy val sessionUser = new SessionUser(ownerUser)
+  lazy val sessionUser2 = new SessionUser(otherAdminUser)
 
+  // Multipart callers
+  lazy val multipartOwnerSessionUser = sessionUser
+  lazy val multipartNoWriteSessionUser = new SessionUser(multipartNoWriteUser)
+
+  // 
---------------------------------------------------------------------------
+  // Lifecycle
+  // 
---------------------------------------------------------------------------
   override protected def beforeAll(): Unit = {
     super.beforeAll()
 
     // init db
     initializeDBAndReplaceDSLContext()
 
-    // insert test user
+    // insert users
     val userDao = new UserDao(getDSLContext.configuration())
-    userDao.insert(testUser)
-    userDao.insert(testUser2)
+    userDao.insert(ownerUser)
+    userDao.insert(otherAdminUser)
+    userDao.insert(multipartNoWriteUser)
+
+    // insert datasets (owned by ownerUser)
+    baseDataset.setOwnerUid(ownerUser.getUid)
+    multipartDataset.setOwnerUid(ownerUser.getUid)
+
+    datasetDao.insert(baseDataset)
+    datasetDao.insert(multipartDataset)
+
+    savedLevels = Map(
+      "org.apache.http.wire" -> setLoggerLevel("org.apache.http.wire", 
Level.WARN),
+      "org.apache.http.headers" -> setLoggerLevel("org.apache.http.headers", 
Level.WARN)
+    )
+  }
+
+  override protected def beforeEach(): Unit = {
+    super.beforeEach()
+
+    // Multipart repo must exist for presigned multipart init to succeed.
+    // If it already exists, ignore 409.
+    try LakeFSStorageClient.initRepo(multipartDataset.getRepositoryName)
+    catch {
+      case e: ApiException if e.getCode == 409 => // ok
+    }
+  }
 
-    // insert test dataset
-    testDataset.setOwnerUid(testUser.getUid)
-    datasetDao.insert(testDataset)
+  override protected def afterAll(): Unit = {
+    try shutdownDB()
+    finally {
+      try savedLevels.foreach { case (name, prev) => setLoggerLevel(name, 
prev) } finally super
+        .afterAll()
+    }
   }
 
+  // 
===========================================================================
+  // DatasetResourceSpec (original basic tests)
+  // 
===========================================================================
   "createDataset" should "create dataset successfully if user does not have a 
dataset with the same name" in {
     val createDatasetRequest = DatasetResource.CreateDatasetRequest(
       datasetName = "new-dataset",
@@ -142,13 +247,11 @@ class DatasetResourceSpec
 
     val dashboardDataset = datasetResource.createDataset(createDatasetRequest, 
sessionUser)
 
-    // Verify the DashboardDataset properties
-    dashboardDataset.ownerEmail shouldEqual testUser.getEmail
+    dashboardDataset.ownerEmail shouldEqual ownerUser.getEmail
     dashboardDataset.accessPrivilege shouldEqual PrivilegeEnum.WRITE
     dashboardDataset.isOwner shouldBe true
     dashboardDataset.size shouldEqual 0
 
-    // Verify the underlying dataset properties
     dashboardDataset.dataset.getName shouldEqual "dashboard-dataset-test"
     dashboardDataset.dataset.getDescription shouldEqual "test for 
DashboardDataset properties"
     dashboardDataset.dataset.getIsPublic shouldBe true
@@ -156,51 +259,1073 @@ class DatasetResourceSpec
   }
 
   it should "delete dataset successfully if user owns it" in {
-    // insert a dataset directly into DB
     val dataset = new Dataset
     dataset.setName("delete-ds")
     dataset.setRepositoryName("delete-ds")
     dataset.setDescription("for delete test")
-    dataset.setOwnerUid(testUser.getUid)
+    dataset.setOwnerUid(ownerUser.getUid)
     dataset.setIsPublic(true)
     dataset.setIsDownloadable(true)
     datasetDao.insert(dataset)
 
-    // create repo in LakeFS to match dataset
     LakeFSStorageClient.initRepo(dataset.getRepositoryName)
 
-    // delete via DatasetResource
     val response = datasetResource.deleteDataset(dataset.getDid, sessionUser)
 
-    // assert: response OK and DB no longer contains dataset
     response.getStatus shouldEqual 200
     datasetDao.fetchOneByDid(dataset.getDid) shouldBe null
   }
 
   it should "refuse to delete dataset if not owned by user" in {
-    // insert a dataset directly into DB
     val dataset = new Dataset
     dataset.setName("user1-ds")
     dataset.setRepositoryName("user1-ds")
     dataset.setDescription("for forbidden test")
-    dataset.setOwnerUid(testUser.getUid)
+    dataset.setOwnerUid(ownerUser.getUid)
     dataset.setIsPublic(true)
     dataset.setIsDownloadable(true)
     datasetDao.insert(dataset)
 
-    // create repo in LakeFS to match dataset
     LakeFSStorageClient.initRepo(dataset.getRepositoryName)
 
-    // user2 tries to delete, should throw ForbiddenException
     assertThrows[ForbiddenException] {
       datasetResource.deleteDataset(dataset.getDid, sessionUser2)
     }
 
-    // dataset still exists in DB
     datasetDao.fetchOneByDid(dataset.getDid) should not be null
   }
 
-  override protected def afterAll(): Unit = {
-    shutdownDB()
+  // 
===========================================================================
+  // Multipart upload tests (merged in)
+  // 
===========================================================================
+
+  // ---------- SHA-256 Utils ----------
+  private def sha256OfChunks(chunks: Seq[Array[Byte]]): Array[Byte] = {
+    val messageDigest = MessageDigest.getInstance("SHA-256")
+    chunks.foreach(messageDigest.update)
+    messageDigest.digest()
+  }
+
+  private def sha256OfFile(path: java.nio.file.Path): Array[Byte] = {
+    val messageDigest = MessageDigest.getInstance("SHA-256")
+    val inputStream = Files.newInputStream(path)
+    try {
+      val buffer = new Array[Byte](8192)
+      var bytesRead = inputStream.read(buffer)
+      while (bytesRead != -1) {
+        messageDigest.update(buffer, 0, bytesRead)
+        bytesRead = inputStream.read(buffer)
+      }
+      messageDigest.digest()
+    } finally inputStream.close()
+  }
+
+  // ---------- helpers ----------
+  private def urlEnc(raw: String): String =
+    URLEncoder.encode(raw, StandardCharsets.UTF_8.name())
+
+  /** Minimum part-size rule (S3-style): every part except the LAST must be >= 
5 MiB. */
+  private val MinNonFinalPartBytes: Int = 5 * 1024 * 1024
+  private def minPartBytes(fillByte: Byte): Array[Byte] =
+    Array.fill[Byte](MinNonFinalPartBytes)(fillByte)
+
+  private def tinyBytes(fillByte: Byte, n: Int = 1): Array[Byte] =
+    Array.fill[Byte](n)(fillByte)
+
+  /** InputStream that behaves like a mid-flight network drop after N bytes. */
+  private def flakyStream(
+      payload: Array[Byte],
+      failAfterBytes: Int,
+      msg: String = "simulated network drop"
+  ): InputStream =
+    new InputStream {
+      private var pos = 0
+      override def read(): Int = {
+        if (pos >= failAfterBytes) throw new IOException(msg)
+        if (pos >= payload.length) return -1
+        val nextByte = payload(pos) & 0xff
+        pos += 1
+        nextByte
+      }
+    }
+
+  /** Minimal HttpHeaders impl needed by DatasetResource.uploadPart */
+  private def mkHeaders(contentLength: Long): HttpHeaders =
+    new HttpHeaders {
+      private val headers = new MultivaluedHashMap[String, String]()
+      headers.putSingle(HttpHeaders.CONTENT_LENGTH, contentLength.toString)
+
+      override def getHeaderString(name: String): String = 
headers.getFirst(name)
+      override def getRequestHeaders = headers
+      override def getRequestHeader(name: String) =
+        Option(headers.get(name)).getOrElse(Collections.emptyList[String]())
+
+      override def getAcceptableMediaTypes = Collections.emptyList[MediaType]()
+      override def getAcceptableLanguages = Collections.emptyList[Locale]()
+      override def getMediaType: MediaType = null
+      override def getLanguage: Locale = null
+      override def getCookies = Collections.emptyMap[String, Cookie]()
+      override def getDate: Date = null
+      override def getLength: Int = contentLength.toInt
+    }
+
+  private def mkHeadersMissingContentLength: HttpHeaders =
+    new HttpHeaders {
+      private val headers = new MultivaluedHashMap[String, String]()
+      override def getHeaderString(name: String): String = null
+      override def getRequestHeaders = headers
+      override def getRequestHeader(name: String) = 
Collections.emptyList[String]()
+      override def getAcceptableMediaTypes = Collections.emptyList[MediaType]()
+      override def getAcceptableLanguages = Collections.emptyList[Locale]()
+      override def getMediaType: MediaType = null
+      override def getLanguage: Locale = null
+      override def getCookies = Collections.emptyMap[String, Cookie]()
+      override def getDate: Date = null
+      override def getLength: Int = -1
+    }
+
+  private def uniqueFilePath(prefix: String): String =
+    s"$prefix/${System.nanoTime()}-${Random.alphanumeric.take(8).mkString}.bin"
+
+  private def initUpload(
+      filePath: String,
+      numParts: Int,
+      user: SessionUser = multipartOwnerSessionUser
+  ): Response =
+    datasetResource.multipartUpload(
+      "init",
+      ownerUser.getEmail,
+      multipartDataset.getName,
+      urlEnc(filePath),
+      Optional.of(numParts),
+      user
+    )
+
+  private def finishUpload(
+      filePath: String,
+      user: SessionUser = multipartOwnerSessionUser
+  ): Response =
+    datasetResource.multipartUpload(
+      "finish",
+      ownerUser.getEmail,
+      multipartDataset.getName,
+      urlEnc(filePath),
+      Optional.empty(),
+      user
+    )
+
+  private def abortUpload(
+      filePath: String,
+      user: SessionUser = multipartOwnerSessionUser
+  ): Response =
+    datasetResource.multipartUpload(
+      "abort",
+      ownerUser.getEmail,
+      multipartDataset.getName,
+      urlEnc(filePath),
+      Optional.empty(),
+      user
+    )
+
+  private def uploadPart(
+      filePath: String,
+      partNumber: Int,
+      bytes: Array[Byte],
+      user: SessionUser = multipartOwnerSessionUser,
+      contentLengthOverride: Option[Long] = None,
+      missingContentLength: Boolean = false
+  ): Response = {
+    val hdrs =
+      if (missingContentLength) mkHeadersMissingContentLength
+      else mkHeaders(contentLengthOverride.getOrElse(bytes.length.toLong))
+
+    datasetResource.uploadPart(
+      ownerUser.getEmail,
+      multipartDataset.getName,
+      urlEnc(filePath),
+      partNumber,
+      new ByteArrayInputStream(bytes),
+      hdrs,
+      user
+    )
+  }
+
+  private def uploadPartWithStream(
+      filePath: String,
+      partNumber: Int,
+      stream: InputStream,
+      contentLength: Long,
+      user: SessionUser = multipartOwnerSessionUser
+  ): Response =
+    datasetResource.uploadPart(
+      ownerUser.getEmail,
+      multipartDataset.getName,
+      urlEnc(filePath),
+      partNumber,
+      stream,
+      mkHeaders(contentLength),
+      user
+    )
+
+  private def fetchSession(filePath: String) =
+    getDSLContext
+      .selectFrom(DATASET_UPLOAD_SESSION)
+      .where(
+        DATASET_UPLOAD_SESSION.UID
+          .eq(ownerUser.getUid)
+          .and(DATASET_UPLOAD_SESSION.DID.eq(multipartDataset.getDid))
+          .and(DATASET_UPLOAD_SESSION.FILE_PATH.eq(filePath))
+      )
+      .fetchOne()
+
+  private def fetchPartRows(uploadId: String) =
+    getDSLContext
+      .selectFrom(DATASET_UPLOAD_SESSION_PART)
+      .where(DATASET_UPLOAD_SESSION_PART.UPLOAD_ID.eq(uploadId))
+      .fetch()
+      .asScala
+      .toList
+
+  private def fetchUploadIdOrFail(filePath: String): String = {
+    val sessionRecord = fetchSession(filePath)
+    sessionRecord should not be null
+    sessionRecord.getUploadId
+  }
+
+  private def assertPlaceholdersCreated(uploadId: String, expectedParts: Int): 
Unit = {
+    val rows = fetchPartRows(uploadId).sortBy(_.getPartNumber)
+    rows.size shouldEqual expectedParts
+    rows.head.getPartNumber shouldEqual 1
+    rows.last.getPartNumber shouldEqual expectedParts
+    rows.foreach { r =>
+      r.getEtag should not be null
+      r.getEtag shouldEqual "" // placeholder convention
+    }
+  }
+
+  private def assertStatus(ex: WebApplicationException, status: Int): Unit =
+    ex.getResponse.getStatus shouldEqual status
+
+  // 
---------------------------------------------------------------------------
+  // INIT TESTS
+  // 
---------------------------------------------------------------------------
+  "multipart-upload?type=init" should "create an upload session row + 
precreate part placeholders (happy path)" in {
+    val filePath = uniqueFilePath("init-happy")
+    val resp = initUpload(filePath, numParts = 3)
+
+    resp.getStatus shouldEqual 200
+
+    val sessionRecord = fetchSession(filePath)
+    sessionRecord should not be null
+    sessionRecord.getNumPartsRequested shouldEqual 3
+    sessionRecord.getUploadId should not be null
+    sessionRecord.getPhysicalAddress should not be null
+
+    assertPlaceholdersCreated(sessionRecord.getUploadId, expectedParts = 3)
+  }
+
+  it should "reject missing numParts" in {
+    val filePath = uniqueFilePath("init-missing-numparts")
+    val ex = intercept[BadRequestException] {
+      datasetResource.multipartUpload(
+        "init",
+        ownerUser.getEmail,
+        multipartDataset.getName,
+        urlEnc(filePath),
+        Optional.empty(),
+        multipartOwnerSessionUser
+      )
+    }
+    assertStatus(ex, 400)
+  }
+
+  it should "reject invalid numParts (0, negative, too large)" in {
+    val filePath = uniqueFilePath("init-bad-numparts")
+    assertStatus(intercept[BadRequestException] { initUpload(filePath, 0) }, 
400)
+    assertStatus(intercept[BadRequestException] { initUpload(filePath, -1) }, 
400)
+    assertStatus(intercept[BadRequestException] { initUpload(filePath, 
1000000000) }, 400)
+  }
+
+  it should "reject invalid filePath (empty, absolute, '.', '..', control 
chars)" in {
+    assertStatus(intercept[BadRequestException] { initUpload("./nope.bin", 2) 
}, 400)
+    assertStatus(intercept[BadRequestException] { initUpload("/absolute.bin", 
2) }, 400)
+    assertStatus(intercept[BadRequestException] { initUpload("a/./b.bin", 2) 
}, 400)
+
+    assertStatus(intercept[BadRequestException] { initUpload("../escape.bin", 
2) }, 400)
+    assertStatus(intercept[BadRequestException] { 
initUpload("a/../escape.bin", 2) }, 400)
+
+    assertStatus(
+      intercept[BadRequestException] {
+        initUpload(s"a/${0.toChar}b.bin", 2)
+      },
+      400
+    )
+  }
+
+  it should "reject invalid type parameter" in {
+    val filePath = uniqueFilePath("init-bad-type")
+    val ex = intercept[BadRequestException] {
+      datasetResource.multipartUpload(
+        "not-a-real-type",
+        ownerUser.getEmail,
+        multipartDataset.getName,
+        urlEnc(filePath),
+        Optional.empty(),
+        multipartOwnerSessionUser
+      )
+    }
+    assertStatus(ex, 400)
+  }
+
+  it should "reject init when caller lacks WRITE access" in {
+    val filePath = uniqueFilePath("init-forbidden")
+    val ex = intercept[ForbiddenException] {
+      initUpload(filePath, numParts = 2, user = multipartNoWriteSessionUser)
+    }
+    assertStatus(ex, 403)
+  }
+
+  it should "handle init race: exactly one succeeds, one gets 409 CONFLICT" in 
{
+    val filePath = uniqueFilePath("init-race")
+    val barrier = new CyclicBarrier(2)
+
+    def callInit(): Either[Throwable, Response] =
+      try {
+        barrier.await()
+        Right(initUpload(filePath, numParts = 2))
+      } catch {
+        case t: Throwable => Left(t)
+      }
+
+    val future1 = Future(callInit())
+    val future2 = Future(callInit())
+    val results = Await.result(Future.sequence(Seq(future1, future2)), 
30.seconds)
+
+    val oks = results.collect { case Right(r) if r.getStatus == 200 => r }
+    val fails = results.collect { case Left(t) => t }
+
+    oks.size shouldEqual 1
+    fails.size shouldEqual 1
+
+    fails.head match {
+      case e: WebApplicationException => assertStatus(e, 409)
+      case other =>
+        fail(
+          s"Expected WebApplicationException(CONFLICT), got: ${other.getClass} 
/ ${other.getMessage}"
+        )
+    }
+
+    val sessionRecord = fetchSession(filePath)
+    sessionRecord should not be null
+    assertPlaceholdersCreated(sessionRecord.getUploadId, expectedParts = 2)
+  }
+
+  it should "reject sequential double init with 409 CONFLICT" in {
+    val filePath = uniqueFilePath("init-double")
+    initUpload(filePath, numParts = 2).getStatus shouldEqual 200
+
+    val ex = intercept[WebApplicationException] { initUpload(filePath, 
numParts = 2) }
+    assertStatus(ex, 409)
+  }
+
+  // 
---------------------------------------------------------------------------
+  // PART UPLOAD TESTS
+  // 
---------------------------------------------------------------------------
+  "multipart-upload/part" should "reject uploadPart if init was not called" in 
{
+    val filePath = uniqueFilePath("part-no-init")
+    val ex = intercept[NotFoundException] {
+      uploadPart(filePath, partNumber = 1, bytes = Array[Byte](1, 2, 3))
+    }
+    assertStatus(ex, 404)
+  }
+
+  it should "reject missing/invalid Content-Length" in {
+    val filePath = uniqueFilePath("part-bad-cl")
+    initUpload(filePath, numParts = 2)
+
+    assertStatus(
+      intercept[BadRequestException] {
+        uploadPart(
+          filePath,
+          partNumber = 1,
+          bytes = Array[Byte](1, 2, 3),
+          missingContentLength = true
+        )
+      },
+      400
+    )
+
+    assertStatus(
+      intercept[BadRequestException] {
+        uploadPart(
+          filePath,
+          partNumber = 1,
+          bytes = Array[Byte](1, 2, 3),
+          contentLengthOverride = Some(0L)
+        )
+      },
+      400
+    )
+
+    assertStatus(
+      intercept[BadRequestException] {
+        uploadPart(
+          filePath,
+          partNumber = 1,
+          bytes = Array[Byte](1, 2, 3),
+          contentLengthOverride = Some(-5L)
+        )
+      },
+      400
+    )
+  }
+
+  it should "reject null/empty filePath param early without depending on error 
text" in {
+    val httpHeaders = mkHeaders(1L)
+
+    val ex1 = intercept[BadRequestException] {
+      datasetResource.uploadPart(
+        ownerUser.getEmail,
+        multipartDataset.getName,
+        null,
+        1,
+        new ByteArrayInputStream(Array.emptyByteArray),
+        httpHeaders,
+        multipartOwnerSessionUser
+      )
+    }
+    assertStatus(ex1, 400)
+
+    val ex2 = intercept[BadRequestException] {
+      datasetResource.uploadPart(
+        ownerUser.getEmail,
+        multipartDataset.getName,
+        "",
+        1,
+        new ByteArrayInputStream(Array.emptyByteArray),
+        httpHeaders,
+        multipartOwnerSessionUser
+      )
+    }
+    assertStatus(ex2, 400)
+  }
+
+  it should "reject invalid partNumber (< 1) and partNumber > requested" in {
+    val filePath = uniqueFilePath("part-bad-pn")
+    initUpload(filePath, numParts = 2)
+
+    assertStatus(
+      intercept[BadRequestException] {
+        uploadPart(filePath, partNumber = 0, bytes = tinyBytes(1.toByte))
+      },
+      400
+    )
+
+    assertStatus(
+      intercept[BadRequestException] {
+        uploadPart(filePath, partNumber = 3, bytes = minPartBytes(2.toByte))
+      },
+      400
+    )
+  }
+
+  it should "reject a non-final part smaller than the minimum size (without 
checking message)" in {
+    val filePath = uniqueFilePath("part-too-small-nonfinal")
+    initUpload(filePath, numParts = 2)
+
+    val ex = intercept[BadRequestException] {
+      uploadPart(filePath, partNumber = 1, bytes = tinyBytes(1.toByte))
+    }
+    assertStatus(ex, 400)
+
+    val uploadId = fetchUploadIdOrFail(filePath)
+    fetchPartRows(uploadId).find(_.getPartNumber == 1).get.getEtag shouldEqual 
""
+  }
+
+  it should "upload a part successfully and persist its ETag into 
DATASET_UPLOAD_SESSION_PART" in {
+    val filePath = uniqueFilePath("part-happy-db")
+    initUpload(filePath, numParts = 2)
+
+    val uploadId = fetchUploadIdOrFail(filePath)
+    fetchPartRows(uploadId).find(_.getPartNumber == 1).get.getEtag shouldEqual 
""
+
+    val bytes = minPartBytes(7.toByte)
+    uploadPart(filePath, partNumber = 1, bytes = bytes).getStatus shouldEqual 
200
+
+    val after = fetchPartRows(uploadId).find(_.getPartNumber == 1).get
+    after.getEtag should not equal ""
+  }
+
+  it should "allow retrying the same part sequentially (no duplicates, etag 
ends non-empty)" in {
+    val filePath = uniqueFilePath("part-retry")
+    initUpload(filePath, numParts = 2)
+    val uploadId = fetchUploadIdOrFail(filePath)
+
+    uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200
+    uploadPart(filePath, 1, minPartBytes(2.toByte)).getStatus shouldEqual 200
+
+    val rows = fetchPartRows(uploadId).filter(_.getPartNumber == 1)
+    rows.size shouldEqual 1
+    rows.head.getEtag should not equal ""
+  }
+
+  it should "apply per-part locking: return 409 if that part row is locked by 
another uploader" in {
+    val filePath = uniqueFilePath("part-lock")
+    initUpload(filePath, numParts = 2)
+    val uploadId = fetchUploadIdOrFail(filePath)
+
+    val connectionProvider = getDSLContext.configuration().connectionProvider()
+    val connection = connectionProvider.acquire()
+    connection.setAutoCommit(false)
+
+    try {
+      val locking = DSL.using(connection, SQLDialect.POSTGRES)
+      locking
+        .selectFrom(DATASET_UPLOAD_SESSION_PART)
+        .where(
+          DATASET_UPLOAD_SESSION_PART.UPLOAD_ID
+            .eq(uploadId)
+            .and(DATASET_UPLOAD_SESSION_PART.PART_NUMBER.eq(1))
+        )
+        .forUpdate()
+        .fetchOne()
+
+      val ex = intercept[WebApplicationException] {
+        uploadPart(filePath, 1, minPartBytes(1.toByte))
+      }
+      assertStatus(ex, 409)
+    } finally {
+      connection.rollback()
+      connectionProvider.release(connection)
+    }
+
+    uploadPart(filePath, 1, minPartBytes(3.toByte)).getStatus shouldEqual 200
+  }
+
+  it should "not block other parts: locking part 1 does not prevent uploading 
part 2" in {
+    val filePath = uniqueFilePath("part-lock-other-part")
+    initUpload(filePath, numParts = 2)
+    val uploadId = fetchUploadIdOrFail(filePath)
+
+    val connectionProvider = getDSLContext.configuration().connectionProvider()
+    val connection = connectionProvider.acquire()
+    connection.setAutoCommit(false)
+
+    try {
+      val locking = DSL.using(connection, SQLDialect.POSTGRES)
+      locking
+        .selectFrom(DATASET_UPLOAD_SESSION_PART)
+        .where(
+          DATASET_UPLOAD_SESSION_PART.UPLOAD_ID
+            .eq(uploadId)
+            .and(DATASET_UPLOAD_SESSION_PART.PART_NUMBER.eq(1))
+        )
+        .forUpdate()
+        .fetchOne()
+
+      uploadPart(filePath, 2, tinyBytes(9.toByte)).getStatus shouldEqual 200
+    } finally {
+      connection.rollback()
+      connectionProvider.release(connection)
+    }
+  }
+
+  it should "reject uploadPart when caller lacks WRITE access" in {
+    val filePath = uniqueFilePath("part-forbidden")
+    initUpload(filePath, numParts = 2)
+
+    val ex = intercept[ForbiddenException] {
+      uploadPart(filePath, 1, minPartBytes(1.toByte), user = 
multipartNoWriteSessionUser)
+    }
+    assertStatus(ex, 403)
+  }
+
+  // 
---------------------------------------------------------------------------
+  // FINISH TESTS
+  // 
---------------------------------------------------------------------------
+  "multipart-upload?type=finish" should "reject finish if init was not called" 
in {
+    val filePath = uniqueFilePath("finish-no-init")
+    val ex = intercept[NotFoundException] { finishUpload(filePath) }
+    assertStatus(ex, 404)
+  }
+
+  it should "reject finish when no parts were uploaded (all placeholders 
empty) without checking messages" in {
+    val filePath = uniqueFilePath("finish-no-parts")
+    initUpload(filePath, numParts = 2)
+
+    val ex = intercept[WebApplicationException] { finishUpload(filePath) }
+    assertStatus(ex, 409)
+
+    fetchSession(filePath) should not be null
+  }
+
+  it should "reject finish when some parts are missing (etag empty treated as 
missing)" in {
+    val filePath = uniqueFilePath("finish-missing")
+    initUpload(filePath, numParts = 3)
+
+    uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200
+
+    val ex = intercept[WebApplicationException] { finishUpload(filePath) }
+    assertStatus(ex, 409)
+
+    val uploadId = fetchUploadIdOrFail(filePath)
+    fetchPartRows(uploadId).find(_.getPartNumber == 2).get.getEtag shouldEqual 
""
+    fetchPartRows(uploadId).find(_.getPartNumber == 3).get.getEtag shouldEqual 
""
+  }
+
+  it should "reject finish when extra part rows exist in DB (bypass endpoint) 
without checking messages" in {
+    val filePath = uniqueFilePath("finish-extra-db")
+    initUpload(filePath, numParts = 2)
+
+    uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200
+    uploadPart(filePath, 2, tinyBytes(2.toByte)).getStatus shouldEqual 200
+
+    val sessionRecord = fetchSession(filePath)
+    val uploadId = sessionRecord.getUploadId
+
+    getDSLContext
+      .insertInto(DATASET_UPLOAD_SESSION_PART)
+      .set(DATASET_UPLOAD_SESSION_PART.UPLOAD_ID, uploadId)
+      .set(DATASET_UPLOAD_SESSION_PART.PART_NUMBER, Integer.valueOf(3))
+      .set(DATASET_UPLOAD_SESSION_PART.ETAG, "bogus-etag")
+      .execute()
+
+    val ex = intercept[WebApplicationException] { finishUpload(filePath) }
+    assertStatus(ex, 500)
+
+    fetchSession(filePath) should not be null
+    fetchPartRows(uploadId).nonEmpty shouldEqual true
+  }
+
+  it should "finish successfully when all parts have non-empty etags; delete 
session + part rows" in {
+    val filePath = uniqueFilePath("finish-happy")
+    initUpload(filePath, numParts = 3)
+
+    uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200
+    uploadPart(filePath, 2, minPartBytes(2.toByte)).getStatus shouldEqual 200
+    uploadPart(filePath, 3, tinyBytes(3.toByte)).getStatus shouldEqual 200
+
+    val uploadId = fetchUploadIdOrFail(filePath)
+
+    val resp = finishUpload(filePath)
+    resp.getStatus shouldEqual 200
+
+    fetchSession(filePath) shouldBe null
+    fetchPartRows(uploadId) shouldBe empty
+  }
+
+  it should "be idempotent-ish: second finish should return NotFound after 
successful finish" in {
+    val filePath = uniqueFilePath("finish-twice")
+    initUpload(filePath, numParts = 1)
+    uploadPart(filePath, 1, tinyBytes(1.toByte)).getStatus shouldEqual 200
+
+    finishUpload(filePath).getStatus shouldEqual 200
+
+    val ex = intercept[NotFoundException] { finishUpload(filePath) }
+    assertStatus(ex, 404)
+  }
+
+  it should "reject finish when caller lacks WRITE access" in {
+    val filePath = uniqueFilePath("finish-forbidden")
+    initUpload(filePath, numParts = 1)
+    uploadPart(filePath, 1, tinyBytes(1.toByte)).getStatus shouldEqual 200
+
+    val ex = intercept[ForbiddenException] {
+      finishUpload(filePath, user = multipartNoWriteSessionUser)
+    }
+    assertStatus(ex, 403)
+  }
+
+  it should "return 409 CONFLICT if the session row is locked by another 
finalizer/aborter" in {
+    val filePath = uniqueFilePath("finish-lock-race")
+    initUpload(filePath, numParts = 1)
+    uploadPart(filePath, 1, tinyBytes(1.toByte)).getStatus shouldEqual 200
+
+    val connectionProvider = getDSLContext.configuration().connectionProvider()
+    val connection = connectionProvider.acquire()
+    connection.setAutoCommit(false)
+
+    try {
+      val locking = DSL.using(connection, SQLDialect.POSTGRES)
+      locking
+        .selectFrom(DATASET_UPLOAD_SESSION)
+        .where(
+          DATASET_UPLOAD_SESSION.UID
+            .eq(ownerUser.getUid)
+            .and(DATASET_UPLOAD_SESSION.DID.eq(multipartDataset.getDid))
+            .and(DATASET_UPLOAD_SESSION.FILE_PATH.eq(filePath))
+        )
+        .forUpdate()
+        .fetchOne()
+
+      val ex = intercept[WebApplicationException] { finishUpload(filePath) }
+      assertStatus(ex, 409)
+    } finally {
+      connection.rollback()
+      connectionProvider.release(connection)
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // ABORT TESTS
+  // 
---------------------------------------------------------------------------
+  "multipart-upload?type=abort" should "reject abort if init was not called" 
in {
+    val filePath = uniqueFilePath("abort-no-init")
+    val ex = intercept[NotFoundException] { abortUpload(filePath) }
+    assertStatus(ex, 404)
+  }
+
+  it should "abort successfully; delete session + part rows" in {
+    val filePath = uniqueFilePath("abort-happy")
+    initUpload(filePath, numParts = 2)
+    uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200
+
+    val uploadId = fetchUploadIdOrFail(filePath)
+
+    abortUpload(filePath).getStatus shouldEqual 200
+
+    fetchSession(filePath) shouldBe null
+    fetchPartRows(uploadId) shouldBe empty
+  }
+
+  it should "reject abort when caller lacks WRITE access" in {
+    val filePath = uniqueFilePath("abort-forbidden")
+    initUpload(filePath, numParts = 1)
+
+    val ex = intercept[ForbiddenException] {
+      abortUpload(filePath, user = multipartNoWriteSessionUser)
+    }
+    assertStatus(ex, 403)
+  }
+
+  it should "return 409 CONFLICT if the session row is locked by another 
finalizer/aborter" in {
+    val filePath = uniqueFilePath("abort-lock-race")
+    initUpload(filePath, numParts = 1)
+
+    val connectionProvider = getDSLContext.configuration().connectionProvider()
+    val connection = connectionProvider.acquire()
+    connection.setAutoCommit(false)
+
+    try {
+      val locking = DSL.using(connection, SQLDialect.POSTGRES)
+      locking
+        .selectFrom(DATASET_UPLOAD_SESSION)
+        .where(
+          DATASET_UPLOAD_SESSION.UID
+            .eq(ownerUser.getUid)
+            .and(DATASET_UPLOAD_SESSION.DID.eq(multipartDataset.getDid))
+            .and(DATASET_UPLOAD_SESSION.FILE_PATH.eq(filePath))
+        )
+        .forUpdate()
+        .fetchOne()
+
+      val ex = intercept[WebApplicationException] { abortUpload(filePath) }
+      assertStatus(ex, 409)
+    } finally {
+      connection.rollback()
+      connectionProvider.release(connection)
+    }
+  }
+
+  it should "be consistent: abort after finish should return NotFound" in {
+    val filePath = uniqueFilePath("abort-after-finish")
+    initUpload(filePath, numParts = 1)
+    uploadPart(filePath, 1, tinyBytes(1.toByte)).getStatus shouldEqual 200
+
+    finishUpload(filePath).getStatus shouldEqual 200
+
+    val ex = intercept[NotFoundException] { abortUpload(filePath) }
+    assertStatus(ex, 404)
+  }
+
+  // 
---------------------------------------------------------------------------
+  // FAILURE / RESILIENCE (still unit tests; simulated failures)
+  // 
---------------------------------------------------------------------------
+  "multipart upload implementation" should "release locks and keep DB 
consistent if the incoming stream fails mid-upload (simulated network drop)" in 
{
+    val filePath = uniqueFilePath("netfail-upload-stream")
+    initUpload(filePath, numParts = 2).getStatus shouldEqual 200
+    val uploadId = fetchUploadIdOrFail(filePath)
+
+    val payload = minPartBytes(5.toByte)
+
+    val flaky = new InputStream {
+      private var pos = 0
+      override def read(): Int = {
+        if (pos >= 1024) throw new IOException("simulated network drop")
+        val b = payload(pos) & 0xff
+        pos += 1
+        b
+      }
+    }
+
+    intercept[Throwable] {
+      uploadPartWithStream(
+        filePath,
+        partNumber = 1,
+        stream = flaky,
+        contentLength = payload.length.toLong
+      )
+    }
+
+    fetchPartRows(uploadId).find(_.getPartNumber == 1).get.getEtag shouldEqual 
""
+
+    uploadPart(filePath, 1, payload).getStatus shouldEqual 200
+    fetchPartRows(uploadId).find(_.getPartNumber == 1).get.getEtag should not 
equal ""
+  }
+
+  it should "not delete session/parts if finalize fails downstream (simulate 
by corrupting an ETag)" in {
+    val filePath = uniqueFilePath("netfail-finish")
+    initUpload(filePath, numParts = 2).getStatus shouldEqual 200
+
+    uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200
+    uploadPart(filePath, 2, tinyBytes(2.toByte)).getStatus shouldEqual 200
+
+    val uploadId = fetchUploadIdOrFail(filePath)
+
+    getDSLContext
+      .update(DATASET_UPLOAD_SESSION_PART)
+      .set(DATASET_UPLOAD_SESSION_PART.ETAG, "definitely-not-a-real-etag")
+      .where(
+        DATASET_UPLOAD_SESSION_PART.UPLOAD_ID
+          .eq(uploadId)
+          .and(DATASET_UPLOAD_SESSION_PART.PART_NUMBER.eq(1))
+      )
+      .execute()
+
+    intercept[Throwable] { finishUpload(filePath) }
+
+    fetchSession(filePath) should not be null
+    fetchPartRows(uploadId).nonEmpty shouldEqual true
+  }
+
+  it should "allow abort + re-init after part 1 succeeded but part 2 drops 
mid-flight; then complete successfully" in {
+    val filePath = uniqueFilePath("reinit-after-part2-drop")
+
+    initUpload(filePath, numParts = 2).getStatus shouldEqual 200
+    val uploadId1 = fetchUploadIdOrFail(filePath)
+
+    uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200
+
+    val bytesPart2 = Array.fill[Byte](1024 * 1024)(2.toByte)
+    intercept[Throwable] {
+      uploadPartWithStream(
+        filePath,
+        partNumber = 2,
+        stream = flakyStream(bytesPart2, failAfterBytes = 4096),
+        contentLength = bytesPart2.length.toLong
+      )
+    }
+
+    abortUpload(filePath).getStatus shouldEqual 200
+    fetchSession(filePath) shouldBe null
+    fetchPartRows(uploadId1) shouldBe empty
+
+    initUpload(filePath, numParts = 2).getStatus shouldEqual 200
+    uploadPart(filePath, 1, minPartBytes(3.toByte)).getStatus shouldEqual 200
+    uploadPart(filePath, 2, tinyBytes(4.toByte, n = 123)).getStatus 
shouldEqual 200
+    finishUpload(filePath).getStatus shouldEqual 200
+    fetchSession(filePath) shouldBe null
+  }
+
+  it should "allow re-upload after failures: (1) part1 drop, (2) part2 drop, 
(3) finalize failure; each followed by abort + re-init + success" in {
+    def abortAndAssertClean(filePath: String, uploadId: String): Unit = {
+      abortUpload(filePath).getStatus shouldEqual 200
+      fetchSession(filePath) shouldBe null
+      fetchPartRows(uploadId) shouldBe empty
+    }
+
+    def reinitAndFinishHappy(filePath: String): Unit = {
+      initUpload(filePath, numParts = 2).getStatus shouldEqual 200
+      uploadPart(filePath, 1, minPartBytes(7.toByte)).getStatus shouldEqual 200
+      uploadPart(filePath, 2, tinyBytes(8.toByte, n = 321)).getStatus 
shouldEqual 200
+      finishUpload(filePath).getStatus shouldEqual 200
+      fetchSession(filePath) shouldBe null
+    }
+
+    withClue("scenario (1): part1 mid-flight drop") {
+      val filePath = uniqueFilePath("reupload-part1-drop")
+      initUpload(filePath, numParts = 2).getStatus shouldEqual 200
+      val uploadId = fetchUploadIdOrFail(filePath)
+
+      val p1 = minPartBytes(5.toByte)
+      intercept[Throwable] {
+        uploadPartWithStream(
+          filePath,
+          partNumber = 1,
+          stream = flakyStream(p1, failAfterBytes = 4096),
+          contentLength = p1.length.toLong
+        )
+      }
+
+      fetchPartRows(uploadId).find(_.getPartNumber == 1).get.getEtag 
shouldEqual ""
+
+      abortAndAssertClean(filePath, uploadId)
+      reinitAndFinishHappy(filePath)
+    }
+
+    withClue("scenario (2): part2 mid-flight drop") {
+      val filePath = uniqueFilePath("reupload-part2-drop")
+      initUpload(filePath, numParts = 2).getStatus shouldEqual 200
+      val uploadId = fetchUploadIdOrFail(filePath)
+
+      uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200
+      val bytesPart2 = Array.fill[Byte](1024 * 1024)(2.toByte)
+      intercept[Throwable] {
+        uploadPartWithStream(
+          filePath,
+          partNumber = 2,
+          stream = flakyStream(bytesPart2, failAfterBytes = 4096),
+          contentLength = bytesPart2.length.toLong
+        )
+      }
+
+      abortAndAssertClean(filePath, uploadId)
+      reinitAndFinishHappy(filePath)
+    }
+
+    withClue("scenario (3): finalize failure then re-upload") {
+      val filePath = uniqueFilePath("reupload-finalize-fail")
+      initUpload(filePath, numParts = 2).getStatus shouldEqual 200
+
+      uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200
+      uploadPart(filePath, 2, tinyBytes(2.toByte)).getStatus shouldEqual 200
+
+      val uploadId = fetchUploadIdOrFail(filePath)
+      getDSLContext
+        .update(DATASET_UPLOAD_SESSION_PART)
+        .set(DATASET_UPLOAD_SESSION_PART.ETAG, "definitely-not-a-real-etag")
+        .where(
+          DATASET_UPLOAD_SESSION_PART.UPLOAD_ID
+            .eq(uploadId)
+            .and(DATASET_UPLOAD_SESSION_PART.PART_NUMBER.eq(1))
+        )
+        .execute()
+
+      intercept[Throwable] { finishUpload(filePath) }
+      fetchSession(filePath) should not be null
+      fetchPartRows(uploadId).nonEmpty shouldEqual true
+
+      abortAndAssertClean(filePath, uploadId)
+      reinitAndFinishHappy(filePath)
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // CORRUPTION CHECKS
+  // 
---------------------------------------------------------------------------
+  it should "upload without corruption (sha256 matches final object)" in {
+    val filePath = uniqueFilePath("sha256-positive")
+    initUpload(filePath, numParts = 3).getStatus shouldEqual 200
+
+    val part1 = minPartBytes(1.toByte)
+    val part2 = minPartBytes(2.toByte)
+    val part3 = Array.fill[Byte](123)(3.toByte)
+
+    uploadPart(filePath, 1, part1).getStatus shouldEqual 200
+    uploadPart(filePath, 2, part2).getStatus shouldEqual 200
+    uploadPart(filePath, 3, part3).getStatus shouldEqual 200
+
+    finishUpload(filePath).getStatus shouldEqual 200
+
+    val expected = sha256OfChunks(Seq(part1, part2, part3))
+
+    val repoName = multipartDataset.getRepositoryName
+    val ref = "main"
+    val downloaded = LakeFSStorageClient.getFileFromRepo(repoName, ref, 
filePath)
+
+    val got = sha256OfFile(Paths.get(downloaded.toURI))
+    got.toSeq shouldEqual expected.toSeq
+  }
+
+  it should "detect corruption (sha256 mismatch when a part is altered)" in {
+    val filePath = uniqueFilePath("sha256-negative")
+    initUpload(filePath, numParts = 3).getStatus shouldEqual 200
+
+    val part1 = minPartBytes(1.toByte)
+    val part2 = minPartBytes(2.toByte)
+    val part3 = Array.fill[Byte](123)(3.toByte)
+
+    val intendedHash = sha256OfChunks(Seq(part1, part2, part3))
+
+    val part2corrupt = part2.clone()
+    part2corrupt(0) = (part2corrupt(0) ^ 0x01).toByte
+
+    uploadPart(filePath, 1, part1).getStatus shouldEqual 200
+    uploadPart(filePath, 2, part2corrupt).getStatus shouldEqual 200
+    uploadPart(filePath, 3, part3).getStatus shouldEqual 200
+
+    finishUpload(filePath).getStatus shouldEqual 200
+
+    val repoName = multipartDataset.getRepositoryName
+    val ref = "main"
+    val downloaded = LakeFSStorageClient.getFileFromRepo(repoName, ref, 
filePath)
+
+    val gotHash = sha256OfFile(Paths.get(downloaded.toURI))
+    gotHash.toSeq should not equal intendedHash.toSeq
+
+    val corruptHash = sha256OfChunks(Seq(part1, part2corrupt, part3))
+    gotHash.toSeq shouldEqual corruptHash.toSeq
+  }
+
+  // 
---------------------------------------------------------------------------
+  // STRESS / SOAK TESTS (tagged)
+  // 
---------------------------------------------------------------------------
+  it should "survive 2 concurrent multipart uploads (fan-out)" taggedAs 
(StressMultipart, Slow) in {
+    val parallelUploads = 2
+    val maxParts = 2
+
+    def oneUpload(i: Int): Future[Unit] =
+      Future {
+        val filePath = uniqueFilePath(s"stress-$i")
+        val numParts = 2 + Random.nextInt(maxParts - 1)
+
+        initUpload(filePath, numParts).getStatus shouldEqual 200
+
+        val sharedMin = minPartBytes((i % 127).toByte)
+        val partFuts = (1 to numParts).map { partN =>
+          Future {
+            val bytes =
+              if (partN < numParts) sharedMin
+              else tinyBytes((partN % 127).toByte, n = 1024)
+            uploadPart(filePath, partN, bytes).getStatus shouldEqual 200
+          }
+        }
+
+        Await.result(Future.sequence(partFuts), 60.seconds)
+
+        finishUpload(filePath).getStatus shouldEqual 200
+        fetchSession(filePath) shouldBe null
+      }
+
+    val all = Future.sequence((1 to parallelUploads).map(oneUpload))
+    Await.result(all, 180.seconds)
+  }
+
+  it should "throttle concurrent uploads of the SAME part via per-part locks" 
taggedAs (StressMultipart, Slow) in {
+    val filePath = uniqueFilePath("stress-same-part")
+    initUpload(filePath, numParts = 2).getStatus shouldEqual 200
+
+    val contenders = 2
+    val barrier = new CyclicBarrier(contenders)
+
+    def tryUploadStatus(): Future[Int] =
+      Future {
+        barrier.await()
+        try {
+          uploadPart(filePath, 1, minPartBytes(7.toByte)).getStatus
+        } catch {
+          case e: WebApplicationException => e.getResponse.getStatus
+        }
+      }
+
+    val statuses =
+      Await.result(Future.sequence((1 to contenders).map(_ => 
tryUploadStatus())), 60.seconds)
+
+    statuses.foreach { s => s should (be(200) or be(409)) }
+    statuses.count(_ == 200) should be >= 1
+
+    val uploadId = fetchUploadIdOrFail(filePath)
+    val part1 = fetchPartRows(uploadId).find(_.getPartNumber == 1).get
+    part1.getEtag.trim should not be ""
   }
 }
diff --git 
a/frontend/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.ts
 
b/frontend/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.ts
index b4d12f5a28..bfc97379ec 100644
--- 
a/frontend/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.ts
+++ 
b/frontend/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.ts
@@ -39,12 +39,14 @@ import { FileUploadItem } from 
"../../../../type/dashboard-file.interface";
 import { DatasetStagedObject } from 
"../../../../../common/type/dataset-staged-object";
 import { NzModalService } from "ng-zorro-antd/modal";
 import { AdminSettingsService } from 
"../../../../service/admin/settings/admin-settings.service";
-import { HttpErrorResponse } from "@angular/common/http";
+import { HttpErrorResponse, HttpStatusCode } from "@angular/common/http";
 import { Subscription } from "rxjs";
 import { formatSpeed, formatTime } from "src/app/common/util/format.util";
 import { format } from "date-fns";
 
 export const THROTTLE_TIME_MS = 1000;
+export const ABORT_RETRY_MAX_ATTEMPTS = 10;
+export const ABORT_RETRY_BACKOFF_BASE_MS = 100;
 
 @UntilDestroy()
 @Component({
@@ -405,103 +407,107 @@ export class DatasetDetailComponent implements OnInit {
     if (this.did) {
       files.forEach(file => {
         // Check if currently uploading
-        this.cancelExistingUpload(file.name);
-
-        // Create upload function
-        const startUpload = () => {
-          this.pendingQueue = this.pendingQueue.filter(item => item.fileName 
!== file.name);
-
-          // Add an initializing task placeholder to uploadTasks
-          this.uploadTasks.unshift({
-            filePath: file.name,
-            percentage: 0,
-            status: "initializing",
-            uploadId: "",
-            physicalAddress: "",
-          });
-          // Start multipart upload
-          const subscription = this.datasetService
-            .multipartUpload(
-              this.ownerEmail,
-              this.datasetName,
-              file.name,
-              file.file,
-              this.chunkSizeMiB * 1024 * 1024,
-              this.maxConcurrentChunks
-            )
-            .pipe(untilDestroyed(this))
-            .subscribe({
-              next: progress => {
-                // Find the task
-                const taskIndex = this.uploadTasks.findIndex(t => t.filePath 
=== file.name);
-
-                if (taskIndex !== -1) {
-                  // Update the task with new progress info
-                  this.uploadTasks[taskIndex] = {
-                    ...this.uploadTasks[taskIndex],
-                    ...progress,
-                    percentage: progress.percentage ?? 
this.uploadTasks[taskIndex].percentage ?? 0,
-                  };
-
-                  // Auto-hide when upload is truly finished
-                  if (progress.status === "finished" && progress.totalTime) {
-                    const filename = file.name.split("/").pop() || file.name;
-                    this.uploadTimeMap.set(filename, progress.totalTime);
+        const continueWithUpload = () => {
+          // Create upload function
+          const startUpload = () => {
+            this.pendingQueue = this.pendingQueue.filter(item => item.fileName 
!== file.name);
+
+            // Add an initializing task placeholder to uploadTasks
+            this.uploadTasks.unshift({
+              filePath: file.name,
+              percentage: 0,
+              status: "initializing",
+            });
+            // Start multipart upload
+            const subscription = this.datasetService
+              .multipartUpload(
+                this.ownerEmail,
+                this.datasetName,
+                file.name,
+                file.file,
+                this.chunkSizeMiB * 1024 * 1024,
+                this.maxConcurrentChunks
+              )
+              .pipe(untilDestroyed(this))
+              .subscribe({
+                next: progress => {
+                  // Find the task
+                  const taskIndex = this.uploadTasks.findIndex(t => t.filePath 
=== file.name);
+
+                  if (taskIndex !== -1) {
+                    // Update the task with new progress info
+                    this.uploadTasks[taskIndex] = {
+                      ...this.uploadTasks[taskIndex],
+                      ...progress,
+                      percentage: progress.percentage ?? 
this.uploadTasks[taskIndex].percentage ?? 0,
+                    };
+
+                    // Auto-hide when upload is truly finished
+                    if (progress.status === "finished" && progress.totalTime) {
+                      const filename = file.name.split("/").pop() || file.name;
+                      this.uploadTimeMap.set(filename, progress.totalTime);
+                      this.userMakeChanges.emit();
+                      this.scheduleHide(taskIndex);
+                      this.onUploadComplete();
+                    }
+                  }
+                },
+                error: () => {
+                  // Handle upload error
+                  const taskIndex = this.uploadTasks.findIndex(t => t.filePath 
=== file.name);
+
+                  if (taskIndex !== -1) {
+                    this.uploadTasks[taskIndex] = {
+                      ...this.uploadTasks[taskIndex],
+                      percentage: 100,
+                      status: "aborted",
+                    };
+                    this.scheduleHide(taskIndex);
+                  }
+                  this.onUploadComplete();
+                },
+                complete: () => {
+                  const taskIndex = this.uploadTasks.findIndex(t => t.filePath 
=== file.name);
+                  if (taskIndex !== -1 && this.uploadTasks[taskIndex].status 
!== "finished") {
+                    this.uploadTasks[taskIndex].status = "finished";
                     this.userMakeChanges.emit();
                     this.scheduleHide(taskIndex);
                     this.onUploadComplete();
                   }
-                }
-              },
-              error: () => {
-                // Handle upload error
-                const taskIndex = this.uploadTasks.findIndex(t => t.filePath 
=== file.name);
-
-                if (taskIndex !== -1) {
-                  this.uploadTasks[taskIndex] = {
-                    ...this.uploadTasks[taskIndex],
-                    percentage: 100,
-                    status: "aborted",
-                  };
-                  this.scheduleHide(taskIndex);
-                }
-                this.onUploadComplete();
-              },
-              complete: () => {
-                const taskIndex = this.uploadTasks.findIndex(t => t.filePath 
=== file.name);
-                if (taskIndex !== -1 && this.uploadTasks[taskIndex].status !== 
"finished") {
-                  this.uploadTasks[taskIndex].status = "finished";
-                  this.userMakeChanges.emit();
-                  this.scheduleHide(taskIndex);
-                  this.onUploadComplete();
-                }
-              },
-            });
-          // Store the subscription for later cleanup
-          this.uploadSubscriptions.set(file.name, subscription);
+                },
+              });
+            // Store the subscription for later cleanup
+            this.uploadSubscriptions.set(file.name, subscription);
+          };
+
+          // Queue management
+          if (this.activeUploads < this.maxConcurrentFiles) {
+            this.activeUploads++;
+            startUpload();
+          } else {
+            this.pendingQueue.push({ fileName: file.name, startUpload });
+          }
         };
 
-        // Queue management
-        if (this.activeUploads < this.maxConcurrentFiles) {
-          this.activeUploads++;
-          startUpload();
-        } else {
-          this.pendingQueue.push({ fileName: file.name, startUpload });
-        }
+        // Check if currently uploading
+        this.cancelExistingUpload(file.name, continueWithUpload);
       });
     }
   }
 
-  cancelExistingUpload(fileName: string): void {
+  cancelExistingUpload(fileName: string, onCanceled?: () => void): void {
     const task = this.uploadTasks.find(t => t.filePath === fileName);
     if (task) {
       if (task.status === "uploading" || task.status === "initializing") {
-        this.onClickAbortUploadProgress(task);
+        this.onClickAbortUploadProgress(task, onCanceled);
         return;
       }
     }
     // Remove from pending queue if present
     this.pendingQueue = this.pendingQueue.filter(item => item.fileName !== 
fileName);
+    if (onCanceled) {
+      onCanceled();
+    }
   }
 
   private processNextQueuedUpload(): void {
@@ -547,7 +553,7 @@ export class DatasetDetailComponent implements OnInit {
     }, 5000);
   }
 
-  onClickAbortUploadProgress(task: MultipartUploadProgress & { filePath: 
string }) {
+  onClickAbortUploadProgress(task: MultipartUploadProgress & { filePath: 
string }, onAborted?: () => void) {
     const subscription = this.uploadSubscriptions.get(task.filePath);
     if (subscription) {
       subscription.unsubscribe();
@@ -558,21 +564,54 @@ export class DatasetDetailComponent implements OnInit {
       this.onUploadComplete();
     }
 
-    this.datasetService
-      .finalizeMultipartUpload(
-        this.ownerEmail,
-        this.datasetName,
-        task.filePath,
-        task.uploadId,
-        [],
-        task.physicalAddress,
-        true // abort flag
-      )
-      .pipe(untilDestroyed(this))
-      .subscribe(() => {
-        this.notificationService.info(`${task.filePath} uploading has been 
terminated`);
-      });
-    // Remove the aborted task immediately
+    let doneCalled = false;
+    const done = () => {
+      if (doneCalled) {
+        return;
+      }
+      doneCalled = true;
+      if (onAborted) {
+        onAborted();
+      }
+    };
+
+    const abortWithRetry = (attempt: number) => {
+      this.datasetService
+        .finalizeMultipartUpload(
+          this.ownerEmail,
+          this.datasetName,
+          task.filePath,
+          true // abort flag
+        )
+        .pipe(untilDestroyed(this))
+        .subscribe({
+          next: () => {
+            this.notificationService.info(`${task.filePath} uploading has been 
terminated`);
+            done();
+          },
+          error: (res: unknown) => {
+            const err = res as HttpErrorResponse;
+
+            // Already gone, treat as done
+            if (err.status === 404) {
+              done();
+              return;
+            }
+
+            // Backend is still finalizing/aborting; retry with a tiny backoff
+            if (err.status === HttpStatusCode.Conflict && attempt < 
ABORT_RETRY_MAX_ATTEMPTS) {
+              setTimeout(() => abortWithRetry(attempt + 1), 
ABORT_RETRY_BACKOFF_BASE_MS * (attempt + 1));
+              return;
+            }
+
+            // Keep current UX: still consider it "aborted" client-side
+            done();
+          },
+        });
+    };
+
+    abortWithRetry(0);
+
     this.uploadTasks = this.uploadTasks.filter(t => t.filePath !== 
task.filePath);
   }
 
diff --git a/frontend/src/app/dashboard/service/user/dataset/dataset.service.ts 
b/frontend/src/app/dashboard/service/user/dataset/dataset.service.ts
index c09125d73b..97b2e264b7 100644
--- a/frontend/src/app/dashboard/service/user/dataset/dataset.service.ts
+++ b/frontend/src/app/dashboard/service/user/dataset/dataset.service.ts
@@ -18,7 +18,7 @@
  */
 
 import { Injectable } from "@angular/core";
-import { HttpClient, HttpParams } from "@angular/common/http";
+import { HttpClient, HttpErrorResponse, HttpParams } from 
"@angular/common/http";
 import { catchError, map, mergeMap, switchMap, tap, toArray } from 
"rxjs/operators";
 import { Dataset, DatasetVersion } from "../../../../common/type/dataset";
 import { AppSettings } from "../../../../common/app-setting";
@@ -27,6 +27,7 @@ import { DashboardDataset } from 
"../../../type/dashboard-dataset.interface";
 import { DatasetFileNode } from 
"../../../../common/type/datasetVersionFileTree";
 import { DatasetStagedObject } from 
"../../../../common/type/dataset-staged-object";
 import { GuiConfigService } from 
"../../../../common/service/gui-config.service";
+import { AuthService } from "src/app/common/service/user/auth.service";
 
 export const DATASET_BASE_URL = "dataset";
 export const DATASET_CREATE_URL = DATASET_BASE_URL + "/create";
@@ -51,8 +52,6 @@ export interface MultipartUploadProgress {
   filePath: string;
   percentage: number;
   status: "initializing" | "uploading" | "finished" | "aborted";
-  uploadId: string;
-  physicalAddress: string;
   uploadSpeed?: number; // bytes per second
   estimatedTimeRemaining?: number; // seconds
   totalTime?: number; // total seconds taken
@@ -122,6 +121,7 @@ export class DatasetService {
   public retrieveAccessibleDatasets(): Observable<DashboardDataset[]> {
     return 
this.http.get<DashboardDataset[]>(`${AppSettings.getApiEndpoint()}/${DATASET_LIST_URL}`);
   }
+
   public createDatasetVersion(did: number, newVersion: string): 
Observable<DatasetVersion> {
     return this.http
       .post<{
@@ -141,6 +141,12 @@ export class DatasetService {
   /**
    * Handles multipart upload for large files using RxJS,
    * with a concurrency limit on how many parts we process in parallel.
+   *
+   * Backend flow:
+   *   POST 
/dataset/multipart-upload?type=init&ownerEmail=...&datasetName=...&filePath=...&numParts=N
+   *   POST 
/dataset/multipart-upload/part?ownerEmail=...&datasetName=...&filePath=...&partNumber=<n>
  (body: raw chunk)
+   *   POST 
/dataset/multipart-upload?type=finish&ownerEmail=...&datasetName=...&filePath=...
+   *   POST 
/dataset/multipart-upload?type=abort&ownerEmail=...&datasetName=...&filePath=...
    */
   public multipartUpload(
     ownerEmail: string,
@@ -152,8 +158,8 @@ export class DatasetService {
   ): Observable<MultipartUploadProgress> {
     const partCount = Math.ceil(file.size / partSize);
 
-    return new Observable(observer => {
-      // Track upload progress for each part independently
+    return new Observable<MultipartUploadProgress>(observer => {
+      // Track upload progress (bytes) for each part independently
       const partProgress = new Map<number, number>();
 
       // Progress tracking state
@@ -162,8 +168,15 @@ export class DatasetService {
       let lastETA = 0;
       let lastUpdateTime = 0;
 
-      // Calculate stats with smoothing
+      const lastStats = {
+        uploadSpeed: 0,
+        estimatedTimeRemaining: 0,
+        totalTime: 0,
+      };
+
       const getTotalTime = () => (startTime ? (Date.now() - startTime) / 1000 
: 0);
+
+      // Calculate stats with smoothing and simple throttling (~1s)
       const calculateStats = (totalUploaded: number) => {
         if (startTime === null) {
           startTime = Date.now();
@@ -172,25 +185,25 @@ export class DatasetService {
         const now = Date.now();
         const elapsed = getTotalTime();
 
-        // Throttle updates to every 1s
         const shouldUpdate = now - lastUpdateTime >= 1000;
         if (!shouldUpdate) {
-          return null;
+          // keep totalTime fresh even when throttled
+          lastStats.totalTime = elapsed;
+          return lastStats;
         }
         lastUpdateTime = now;
 
-        // Calculate speed with moving average
         const currentSpeed = elapsed > 0 ? totalUploaded / elapsed : 0;
         speedSamples.push(currentSpeed);
-        if (speedSamples.length > 5) speedSamples.shift();
-        const avgSpeed = speedSamples.reduce((a, b) => a + b, 0) / 
speedSamples.length;
+        if (speedSamples.length > 5) {
+          speedSamples.shift();
+        }
+        const avgSpeed = speedSamples.length > 0 ? speedSamples.reduce((a, b) 
=> a + b, 0) / speedSamples.length : 0;
 
-        // Calculate smooth ETA
         const remaining = file.size - totalUploaded;
         let eta = avgSpeed > 0 ? remaining / avgSpeed : 0;
-        eta = Math.min(eta, 24 * 60 * 60); // cap ETA at 24h, 86400 sec
+        eta = Math.min(eta, 24 * 60 * 60); // cap ETA at 24h
 
-        // Smooth ETA changes (limit to 30% change)
         if (lastETA > 0 && eta > 0) {
           const maxChange = lastETA * 0.3;
           const diff = Math.abs(eta - lastETA);
@@ -200,106 +213,118 @@ export class DatasetService {
         }
         lastETA = eta;
 
-        // Near completion optimization
         const percentComplete = (totalUploaded / file.size) * 100;
         if (percentComplete > 95) {
           eta = Math.min(eta, 10);
         }
 
-        return {
-          uploadSpeed: avgSpeed,
-          estimatedTimeRemaining: Math.max(0, Math.round(eta)),
-          totalTime: elapsed,
-        };
+        lastStats.uploadSpeed = avgSpeed;
+        lastStats.estimatedTimeRemaining = Math.max(0, Math.round(eta));
+        lastStats.totalTime = elapsed;
+
+        return lastStats;
       };
 
-      const subscription = this.initiateMultipartUpload(ownerEmail, 
datasetName, filePath, partCount)
+      // 1. INIT: ask backend to create a LakeFS multipart upload session
+      const initParams = new HttpParams()
+        .set("type", "init")
+        .set("ownerEmail", ownerEmail)
+        .set("datasetName", datasetName)
+        .set("filePath", encodeURIComponent(filePath))
+        .set("numParts", partCount.toString());
+
+      const init$ = this.http.post<{}>(
+        `${AppSettings.getApiEndpoint()}/${DATASET_BASE_URL}/multipart-upload`,
+        {},
+        { params: initParams }
+      );
+
+      const initWithAbortRetry$ = init$.pipe(
+        catchError((res: unknown) => {
+          const err = res as HttpErrorResponse;
+          if (err.status !== 409) {
+            return throwError(() => err);
+          }
+
+          // Init failed because a session already exists. Abort it and retry 
init once.
+          return this.finalizeMultipartUpload(ownerEmail, datasetName, 
filePath, true).pipe(
+            // best-effort abort; if abort itself fails, let the re-init decide
+            catchError(() => EMPTY),
+            switchMap(() => init$)
+          );
+        })
+      );
+
+      const subscription = initWithAbortRetry$
         .pipe(
-          switchMap(initiateResponse => {
-            const { uploadId, presignedUrls, physicalAddress } = 
initiateResponse;
-            if (!uploadId) {
-              observer.error(new Error("Failed to initiate multipart upload"));
-              return EMPTY;
-            }
+          switchMap(initResp => {
+            // Notify UI that upload is starting
             observer.next({
-              filePath: filePath,
+              filePath,
               percentage: 0,
               status: "initializing",
-              uploadId: uploadId,
-              physicalAddress: physicalAddress,
               uploadSpeed: 0,
               estimatedTimeRemaining: 0,
               totalTime: 0,
             });
 
-            // Keep track of all uploaded parts
-            const uploadedParts: { PartNumber: number; ETag: string }[] = [];
-
-            // 1) Convert presignedUrls into a stream of URLs
-            return from(presignedUrls).pipe(
-              // 2) Use mergeMap with concurrency limit to upload chunk by 
chunk
-              mergeMap((url, index) => {
+            // 2. Upload each part to /multipart-upload/part using 
XMLHttpRequest
+            return from(Array.from({ length: partCount }, (_, i) => i)).pipe(
+              mergeMap(index => {
                 const partNumber = index + 1;
                 const start = index * partSize;
                 const end = Math.min(start + partSize, file.size);
                 const chunk = file.slice(start, end);
 
-                // Upload the chunk
-                return new Observable(partObserver => {
+                return new Observable<void>(partObserver => {
                   const xhr = new XMLHttpRequest();
 
                   xhr.upload.addEventListener("progress", event => {
                     if (event.lengthComputable) {
-                      // Update this specific part's progress
                       partProgress.set(partNumber, event.loaded);
 
-                      // Calculate total progress across all parts
                       let totalUploaded = 0;
-                      partProgress.forEach(bytes => (totalUploaded += bytes));
+                      partProgress.forEach(bytes => {
+                        totalUploaded += bytes;
+                      });
+
                       const percentage = Math.round((totalUploaded / 
file.size) * 100);
                       const stats = calculateStats(totalUploaded);
 
                       observer.next({
                         filePath,
-                        percentage: Math.min(percentage, 99), // Cap at 99% 
until finalized
+                        percentage: Math.min(percentage, 99),
                         status: "uploading",
-                        uploadId,
-                        physicalAddress,
                         ...stats,
                       });
                     }
                   });
 
                   xhr.addEventListener("load", () => {
-                    if (xhr.status === 200 || xhr.status === 201) {
-                      const etag = 
xhr.getResponseHeader("ETag")?.replace(/"/g, "");
-                      if (!etag) {
-                        partObserver.error(new Error(`Missing ETag for part 
${partNumber}`));
-                        return;
-                      }
-
-                      // Mark this part as fully uploaded
+                    if (xhr.status === 200 || xhr.status === 204) {
+                      // Mark part as fully uploaded
                       partProgress.set(partNumber, chunk.size);
-                      uploadedParts.push({ PartNumber: partNumber, ETag: etag 
});
 
-                      // Recalculate progress
                       let totalUploaded = 0;
-                      partProgress.forEach(bytes => (totalUploaded += bytes));
-                      const percentage = Math.round((totalUploaded / 
file.size) * 100);
+                      partProgress.forEach(bytes => {
+                        totalUploaded += bytes;
+                      });
+
+                      // Force stats recompute on completion
                       lastUpdateTime = 0;
+                      const percentage = Math.round((totalUploaded / 
file.size) * 100);
                       const stats = calculateStats(totalUploaded);
 
                       observer.next({
                         filePath,
                         percentage: Math.min(percentage, 99),
                         status: "uploading",
-                        uploadId,
-                        physicalAddress,
                         ...stats,
                       });
+
                       partObserver.complete();
                     } else {
-                      partObserver.error(new Error(`Failed to upload part 
${partNumber}`));
+                      partObserver.error(new Error(`Failed to upload part 
${partNumber} (HTTP ${xhr.status})`));
                     }
                   });
 
@@ -309,60 +334,88 @@ export class DatasetService {
                     partObserver.error(new Error(`Failed to upload part 
${partNumber}`));
                   });
 
-                  xhr.open("PUT", url);
+                  const partUrl =
+                    
`${AppSettings.getApiEndpoint()}/${DATASET_BASE_URL}/multipart-upload/part` +
+                    `?ownerEmail=${encodeURIComponent(ownerEmail)}` +
+                    `&datasetName=${encodeURIComponent(datasetName)}` +
+                    `&filePath=${encodeURIComponent(filePath)}` +
+                    `&partNumber=${partNumber}`;
+
+                  xhr.open("POST", partUrl);
+                  xhr.setRequestHeader("Content-Type", 
"application/octet-stream");
+                  const token = AuthService.getAccessToken();
+                  if (token) {
+                    xhr.setRequestHeader("Authorization", `Bearer ${token}`);
+                  }
                   xhr.send(chunk);
+                  return () => {
+                    try {
+                      xhr.abort();
+                    } catch {}
+                  };
                 });
               }, concurrencyLimit),
-
-              // 3) Collect results from all uploads (like forkJoin, but 
respects concurrency)
-              toArray(),
-              // 4) Finalize if all parts succeeded
-              switchMap(() =>
-                this.finalizeMultipartUpload(
-                  ownerEmail,
-                  datasetName,
-                  filePath,
-                  uploadId,
-                  uploadedParts,
-                  physicalAddress,
-                  false
-                )
-              ),
+              toArray(), // wait for all parts
+              // 3. FINISH: notify backend that all parts are done
+              switchMap(() => {
+                const finishParams = new HttpParams()
+                  .set("type", "finish")
+                  .set("ownerEmail", ownerEmail)
+                  .set("datasetName", datasetName)
+                  .set("filePath", encodeURIComponent(filePath));
+
+                return this.http.post(
+                  
`${AppSettings.getApiEndpoint()}/${DATASET_BASE_URL}/multipart-upload`,
+                  {},
+                  { params: finishParams }
+                );
+              }),
               tap(() => {
+                const totalTime = getTotalTime();
                 observer.next({
                   filePath,
                   percentage: 100,
                   status: "finished",
-                  uploadId: uploadId,
-                  physicalAddress: physicalAddress,
                   uploadSpeed: 0,
                   estimatedTimeRemaining: 0,
-                  totalTime: getTotalTime(),
+                  totalTime,
                 });
                 observer.complete();
               }),
               catchError((error: unknown) => {
-                // If an error occurred, abort the upload
+                // On error, compute best-effort percentage from bytes we've 
seen
+                let totalUploaded = 0;
+                partProgress.forEach(bytes => {
+                  totalUploaded += bytes;
+                });
+                const percentage = file.size > 0 ? Math.round((totalUploaded / 
file.size) * 100) : 0;
+
                 observer.next({
                   filePath,
-                  percentage: Math.round((uploadedParts.length / partCount) * 
100),
+                  percentage,
                   status: "aborted",
-                  uploadId: uploadId,
-                  physicalAddress: physicalAddress,
                   uploadSpeed: 0,
                   estimatedTimeRemaining: 0,
                   totalTime: getTotalTime(),
                 });
 
-                return this.finalizeMultipartUpload(
-                  ownerEmail,
-                  datasetName,
-                  filePath,
-                  uploadId,
-                  uploadedParts,
-                  physicalAddress,
-                  true
-                ).pipe(switchMap(() => throwError(() => error)));
+                // Abort on backend
+                const abortParams = new HttpParams()
+                  .set("type", "abort")
+                  .set("ownerEmail", ownerEmail)
+                  .set("datasetName", datasetName)
+                  .set("filePath", encodeURIComponent(filePath));
+
+                return this.http
+                  .post(
+                    
`${AppSettings.getApiEndpoint()}/${DATASET_BASE_URL}/multipart-upload`,
+                    {},
+                    { params: abortParams }
+                  )
+                  .pipe(
+                    switchMap(() => throwError(() => error)),
+                    catchError(() => throwError(() => error))
+                  );
               })
             );
           })
@@ -370,59 +423,26 @@ export class DatasetService {
         .subscribe({
           error: (err: unknown) => observer.error(err),
         });
+
       return () => subscription.unsubscribe();
     });
   }
 
-  /**
-   * Initiates a multipart upload and retrieves presigned URLs for each part.
-   * @param ownerEmail Owner's email
-   * @param datasetName Dataset Name
-   * @param filePath File path within the dataset
-   * @param numParts Number of parts for the multipart upload
-   */
-  private initiateMultipartUpload(
-    ownerEmail: string,
-    datasetName: string,
-    filePath: string,
-    numParts: number
-  ): Observable<{ uploadId: string; presignedUrls: string[]; physicalAddress: 
string }> {
-    const params = new HttpParams()
-      .set("type", "init")
-      .set("ownerEmail", ownerEmail)
-      .set("datasetName", datasetName)
-      .set("filePath", encodeURIComponent(filePath))
-      .set("numParts", numParts.toString());
-
-    return this.http.post<{ uploadId: string; presignedUrls: string[]; 
physicalAddress: string }>(
-      `${AppSettings.getApiEndpoint()}/${DATASET_BASE_URL}/multipart-upload`,
-      {},
-      { params }
-    );
-  }
-
-  /**
-   * Completes or aborts a multipart upload, sending part numbers and ETags to 
the backend.
-   */
   public finalizeMultipartUpload(
     ownerEmail: string,
     datasetName: string,
     filePath: string,
-    uploadId: string,
-    parts: { PartNumber: number; ETag: string }[],
-    physicalAddress: string,
     isAbort: boolean
   ): Observable<Response> {
     const params = new HttpParams()
       .set("type", isAbort ? "abort" : "finish")
       .set("ownerEmail", ownerEmail)
       .set("datasetName", datasetName)
-      .set("filePath", encodeURIComponent(filePath))
-      .set("uploadId", uploadId);
+      .set("filePath", encodeURIComponent(filePath));
 
     return this.http.post<Response>(
       `${AppSettings.getApiEndpoint()}/${DATASET_BASE_URL}/multipart-upload`,
-      { parts, physicalAddress },
+      {},
       { params }
     );
   }
diff --git a/sql/texera_ddl.sql b/sql/texera_ddl.sql
index 48e51dca87..57ac69b687 100644
--- a/sql/texera_ddl.sql
+++ b/sql/texera_ddl.sql
@@ -58,6 +58,9 @@ DROP TABLE IF EXISTS workflow_version CASCADE;
 DROP TABLE IF EXISTS project CASCADE;
 DROP TABLE IF EXISTS workflow_of_project CASCADE;
 DROP TABLE IF EXISTS workflow_executions CASCADE;
+DROP TABLE IF EXISTS dataset_upload_session CASCADE;
+DROP TABLE IF EXISTS dataset_upload_session_part CASCADE;
+
 DROP TABLE IF EXISTS dataset CASCADE;
 DROP TABLE IF EXISTS dataset_user_access CASCADE;
 DROP TABLE IF EXISTS dataset_version CASCADE;
@@ -275,6 +278,36 @@ CREATE TABLE IF NOT EXISTS dataset_version
     FOREIGN KEY (did) REFERENCES dataset(did) ON DELETE CASCADE
     );
 
+CREATE TABLE IF NOT EXISTS dataset_upload_session
+(
+    did              INT NOT NULL,
+    uid              INT NOT NULL,
+    file_path        TEXT NOT NULL,
+    upload_id        VARCHAR(256) NOT NULL UNIQUE,
+    physical_address TEXT,
+    num_parts_requested INT NOT NULL,
+
+    PRIMARY KEY (uid, did, file_path),
+
+    FOREIGN KEY (did) REFERENCES dataset(did) ON DELETE CASCADE,
+    FOREIGN KEY (uid) REFERENCES "user"(uid) ON DELETE CASCADE
+);
+
+CREATE TABLE IF NOT EXISTS dataset_upload_session_part
+(
+    upload_id   VARCHAR(256) NOT NULL,
+    part_number INT          NOT NULL,
+    etag        TEXT         NOT NULL DEFAULT '',
+
+    PRIMARY KEY (upload_id, part_number),
+
+    CONSTRAINT chk_part_number_positive CHECK (part_number > 0),
+
+    FOREIGN KEY (upload_id)
+        REFERENCES dataset_upload_session(upload_id)
+        ON DELETE CASCADE
+);
+
 -- operator_executions (modified to match MySQL: no separate primary key; 
added console_messages_uri)
 CREATE TABLE IF NOT EXISTS operator_executions
 (
diff --git a/sql/updates/17.sql b/sql/updates/17.sql
new file mode 100644
index 0000000000..9436c40528
--- /dev/null
+++ b/sql/updates/17.sql
@@ -0,0 +1,66 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+-- ============================================
+-- 1. Connect to the texera_db database
+-- ============================================
+\c texera_db
+
+SET search_path TO texera_db;
+
+-- ============================================
+-- 2. Update the table schema
+-- ============================================
+BEGIN;
+
+-- 1. Drop old tables (if exist)
+DROP TABLE IF EXISTS dataset_upload_session CASCADE;
+DROP TABLE IF EXISTS dataset_upload_session_part CASCADE;
+
+-- 2. Create dataset upload session table
+CREATE TABLE IF NOT EXISTS dataset_upload_session
+(
+    did                 INT          NOT NULL,
+    uid                 INT          NOT NULL,
+    file_path           TEXT         NOT NULL,
+    upload_id           VARCHAR(256) NOT NULL UNIQUE,
+    physical_address    TEXT,
+    num_parts_requested INT          NOT NULL,
+
+    PRIMARY KEY (uid, did, file_path),
+
+    FOREIGN KEY (did) REFERENCES dataset(did) ON DELETE CASCADE,
+    FOREIGN KEY (uid) REFERENCES "user"(uid) ON DELETE CASCADE
+    );
+
+-- 3. Create dataset upload session parts table
+CREATE TABLE IF NOT EXISTS dataset_upload_session_part
+(
+    upload_id   VARCHAR(256) NOT NULL,
+    part_number INT          NOT NULL,
+    etag        TEXT         NOT NULL DEFAULT '',
+
+    PRIMARY KEY (upload_id, part_number),
+
+    CONSTRAINT chk_part_number_positive CHECK (part_number > 0),
+
+    FOREIGN KEY (upload_id)
+    REFERENCES dataset_upload_session(upload_id)
+    ON DELETE CASCADE
+    );
+
+COMMIT;

Reply via email to