This is an automated email from the ASF dual-hosted git repository.
aicam pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 830e3090b1 feat(backend): Resumable Uploads (#4181)
830e3090b1 is described below
commit 830e3090b1b8a419f294a008a01a153259a64b63
Author: carloea2 <[email protected]>
AuthorDate: Wed Feb 4 21:51:42 2026 -0800
feat(backend): Resumable Uploads (#4181)
---
.../texera/service/util/S3StorageClient.scala | 1 +
.../texera/service/resource/DatasetResource.scala | 339 ++++++++----
.../service/resource/DatasetResourceSpec.scala | 581 ++++++++++++++++++++-
frontend/src/app/app.module.ts | 2 +
.../conflicting-file-modal-content.component.html | 35 ++
.../conflicting-file-modal-content.component.scss | 22 +
.../conflicting-file-modal-content.component.ts | 37 ++
.../files-uploader/files-uploader.component.ts | 214 ++++++--
.../dataset-detail.component.ts | 31 +-
.../service/user/dataset/dataset.service.ts | 91 ++--
.../app/dashboard/type/dashboard-file.interface.ts | 1 +
sql/texera_ddl.sql | 1 +
sql/updates/20.sql | 37 ++
13 files changed, 1176 insertions(+), 216 deletions(-)
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
index b7a66a1bc8..f3d252d413 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
@@ -38,6 +38,7 @@ import scala.jdk.CollectionConverters._
object S3StorageClient {
val MINIMUM_NUM_OF_MULTIPART_S3_PART: Long = 5L * 1024 * 1024 // 5 MiB
val MAXIMUM_NUM_OF_MULTIPART_S3_PARTS = 10_000
+ val PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS = 6
// Initialize MinIO-compatible S3 Client
private lazy val s3Client: S3Client = {
diff --git
a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala
b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala
index f39885367c..a60bc07adf 100644
---
a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala
+++
b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala
@@ -51,11 +51,12 @@ import
org.apache.texera.service.resource.DatasetResource.{context, _}
import org.apache.texera.service.util.S3StorageClient
import org.apache.texera.service.util.S3StorageClient.{
MAXIMUM_NUM_OF_MULTIPART_S3_PARTS,
- MINIMUM_NUM_OF_MULTIPART_S3_PART
+ MINIMUM_NUM_OF_MULTIPART_S3_PART,
+ PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS
}
import org.jooq.impl.DSL
import org.jooq.impl.DSL.{inline => inl}
-import org.jooq.{DSLContext, EnumType}
+import org.jooq.{DSLContext, EnumType, Record2, Result}
import java.io.{InputStream, OutputStream}
import java.net.{HttpURLConnection, URI, URL, URLDecoder}
@@ -73,8 +74,10 @@ import org.jooq.exception.DataAccessException
import software.amazon.awssdk.services.s3.model.UploadPartResponse
import org.apache.commons.io.FilenameUtils
import
org.apache.texera.service.util.LakeFSExceptionHandler.withLakeFSErrorHandling
+import
org.apache.texera.dao.jooq.generated.tables.records.DatasetUploadSessionRecord
import java.sql.SQLException
+import java.time.OffsetDateTime
import scala.util.Try
object DatasetResource {
@@ -693,14 +696,16 @@ class DatasetResource {
@QueryParam("filePath") filePath: String,
@QueryParam("fileSizeBytes") fileSizeBytes: Optional[java.lang.Long],
@QueryParam("partSizeBytes") partSizeBytes: Optional[java.lang.Long],
+ @QueryParam("restart") restart: Optional[java.lang.Boolean],
@Auth user: SessionUser
): Response = {
val uid = user.getUid
val dataset: Dataset = getDatasetBy(ownerEmail, datasetName)
operationType.toLowerCase match {
+ case "list" => listMultipartUploads(dataset.getDid, uid)
case "init" =>
- initMultipartUpload(dataset.getDid, filePath, fileSizeBytes,
partSizeBytes, uid)
+ initMultipartUpload(dataset.getDid, filePath, fileSizeBytes,
partSizeBytes, restart, uid)
case "finish" => finishMultipartUpload(dataset.getDid, filePath, uid)
case "abort" => abortMultipartUpload(dataset.getDid, filePath, uid)
case _ =>
@@ -1488,11 +1493,38 @@ class DatasetResource {
dataset
}
+ private def listMultipartUploads(did: Integer, requesterUid: Int): Response
= {
+ withTransaction(context) { ctx =>
+ if (!userHasWriteAccess(ctx, did, requesterUid)) {
+ throw new ForbiddenException(ERR_USER_HAS_NO_ACCESS_TO_DATASET_MESSAGE)
+ }
+
+ val filePaths =
+ ctx
+ .selectDistinct(DATASET_UPLOAD_SESSION.FILE_PATH)
+ .from(DATASET_UPLOAD_SESSION)
+ .where(DATASET_UPLOAD_SESSION.DID.eq(did))
+ .and(
+ DSL.condition(
+ "created_at > current_timestamp - (? * interval '1 hour')",
+ PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS
+ )
+ )
+ .orderBy(DATASET_UPLOAD_SESSION.FILE_PATH.asc())
+ .fetch(DATASET_UPLOAD_SESSION.FILE_PATH)
+ .asScala
+ .toList
+
+ Response.ok(Map("filePaths" -> filePaths.asJava)).build()
+ }
+ }
+
private def initMultipartUpload(
did: Integer,
encodedFilePath: String,
fileSizeBytes: Optional[java.lang.Long],
partSizeBytes: Optional[java.lang.Long],
+ restart: Optional[java.lang.Boolean],
uid: Integer
): Response = {
@@ -1509,27 +1541,17 @@ class DatasetResource {
URLDecoder.decode(encodedFilePath, StandardCharsets.UTF_8.name())
)
- val fileSizeBytesValue: Long =
- fileSizeBytes
- .orElseThrow(() =>
- new BadRequestException("fileSizeBytes is required for
initialization")
- )
-
- if (fileSizeBytesValue <= 0L) {
- throw new BadRequestException("fileSizeBytes must be > 0")
- }
+ if (fileSizeBytes == null || !fileSizeBytes.isPresent)
+ throw new BadRequestException("fileSizeBytes is required for
initialization")
+ if (partSizeBytes == null || !partSizeBytes.isPresent)
+ throw new BadRequestException("partSizeBytes is required for
initialization")
- val partSizeBytesValue: Long =
- partSizeBytes
- .orElseThrow(() =>
- new BadRequestException("partSizeBytes is required for
initialization")
- )
+ val fileSizeBytesValue: Long = fileSizeBytes.get.longValue()
+ val partSizeBytesValue: Long = partSizeBytes.get.longValue()
- if (partSizeBytesValue <= 0L) {
- throw new BadRequestException("partSizeBytes must be > 0")
- }
+ if (fileSizeBytesValue <= 0L) throw new
BadRequestException("fileSizeBytes must be > 0")
+ if (partSizeBytesValue <= 0L) throw new
BadRequestException("partSizeBytes must be > 0")
- // singleFileUploadMaxBytes applies to TOTAL bytes (sum of all parts ==
file size)
val totalMaxBytes: Long = singleFileUploadMaxBytes(ctx)
if (totalMaxBytes <= 0L) {
throw new WebApplicationException(
@@ -1543,7 +1565,6 @@ class DatasetResource {
)
}
- // Compute numParts = ceil(fileSize / partSize) = (fileSize + partSize -
1) / partSize
val addend: Long = partSizeBytesValue - 1L
if (addend < 0L || fileSizeBytesValue > Long.MaxValue - addend) {
throw new WebApplicationException(
@@ -1558,111 +1579,229 @@ class DatasetResource {
s"Computed numParts=$numPartsLong is out of range
1..$MAXIMUM_NUM_OF_MULTIPART_S3_PARTS"
)
}
- val numPartsValue: Int = numPartsLong.toInt
+ val computedNumParts: Int = numPartsLong.toInt
- // S3 multipart constraint: all non-final parts must be >= 5MiB.
- // If we have >1 parts, then partSizeBytesValue is the non-final part
size.
- if (numPartsValue > 1 && partSizeBytesValue <
MINIMUM_NUM_OF_MULTIPART_S3_PART) {
+ if (computedNumParts > 1 && partSizeBytesValue <
MINIMUM_NUM_OF_MULTIPART_S3_PART) {
throw new BadRequestException(
s"partSizeBytes=$partSizeBytesValue is too small. " +
s"All non-final parts must be >= $MINIMUM_NUM_OF_MULTIPART_S3_PART
bytes."
)
}
-
- // Reject if a session already exists
- val exists = ctx.fetchExists(
- ctx
- .selectOne()
- .from(DATASET_UPLOAD_SESSION)
+ var session: DatasetUploadSessionRecord = null
+ var rows: Result[Record2[Integer, String]] = null
+ try {
+ session = ctx
+ .selectFrom(DATASET_UPLOAD_SESSION)
.where(
DATASET_UPLOAD_SESSION.UID
.eq(uid)
.and(DATASET_UPLOAD_SESSION.DID.eq(did))
.and(DATASET_UPLOAD_SESSION.FILE_PATH.eq(filePath))
)
- )
- if (exists) {
- throw new WebApplicationException(
- "Upload already in progress for this filePath",
- Response.Status.CONFLICT
- )
- }
-
- val presign = withLakeFSErrorHandling {
- LakeFSStorageClient.initiatePresignedMultipartUploads(
- repositoryName,
- filePath,
- numPartsValue
- )
+ .forUpdate()
+ .noWait()
+ .fetchOne()
+ if (session != null) {
+ //Gain parts lock
+ rows = ctx
+ .select(DATASET_UPLOAD_SESSION_PART.PART_NUMBER,
DATASET_UPLOAD_SESSION_PART.ETAG)
+ .from(DATASET_UPLOAD_SESSION_PART)
+
.where(DATASET_UPLOAD_SESSION_PART.UPLOAD_ID.eq(session.getUploadId))
+ .forUpdate()
+ .noWait()
+ .fetch()
+ val dbFileSize = session.getFileSizeBytes
+ val dbPartSize = session.getPartSizeBytes
+ val dbNumParts = session.getNumPartsRequested
+ val createdAt: OffsetDateTime = session.getCreatedAt
+
+ val isExpired =
+ createdAt
+ .plusHours(PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS.toLong)
+ .isBefore(OffsetDateTime.now(createdAt.getOffset)) // or
OffsetDateTime.now()
+
+ val conflictConfig =
+ dbFileSize != fileSizeBytesValue ||
+ dbPartSize != partSizeBytesValue ||
+ dbNumParts != computedNumParts ||
+ isExpired ||
+ Option(restart).exists(_.orElse(false))
+
+ if (conflictConfig) {
+ // Parts will be deleted automatically (ON DELETE CASCADE)
+ ctx
+ .deleteFrom(DATASET_UPLOAD_SESSION)
+ .where(DATASET_UPLOAD_SESSION.UPLOAD_ID.eq(session.getUploadId))
+ .execute()
+
+ try {
+ LakeFSStorageClient.abortPresignedMultipartUploads(
+ repositoryName,
+ filePath,
+ session.getUploadId,
+ session.getPhysicalAddress
+ )
+ } catch { case _: Throwable => () }
+ session = null
+ rows = null
+ }
+ }
+ } catch {
+ case e: DataAccessException
+ if Option(e.getCause)
+ .collect { case s: SQLException => s.getSQLState }
+ .contains("55P03") =>
+ throw new WebApplicationException(
+ "Another client is uploading this file",
+ Response.Status.CONFLICT
+ )
}
- val uploadIdStr = presign.getUploadId
- val physicalAddr = presign.getPhysicalAddress
-
- try {
- val rowsInserted = ctx
- .insertInto(DATASET_UPLOAD_SESSION)
- .set(DATASET_UPLOAD_SESSION.FILE_PATH, filePath)
- .set(DATASET_UPLOAD_SESSION.DID, did)
- .set(DATASET_UPLOAD_SESSION.UID, uid)
- .set(DATASET_UPLOAD_SESSION.UPLOAD_ID, uploadIdStr)
- .set(DATASET_UPLOAD_SESSION.PHYSICAL_ADDRESS, physicalAddr)
- .set(DATASET_UPLOAD_SESSION.NUM_PARTS_REQUESTED,
Integer.valueOf(numPartsValue))
- .set(DATASET_UPLOAD_SESSION.FILE_SIZE_BYTES,
java.lang.Long.valueOf(fileSizeBytesValue))
- .set(DATASET_UPLOAD_SESSION.PART_SIZE_BYTES,
java.lang.Long.valueOf(partSizeBytesValue))
- .onDuplicateKeyIgnore()
- .execute()
-
- if (rowsInserted != 1) {
- LakeFSStorageClient.abortPresignedMultipartUploads(
+ if (session == null) {
+ val presign = withLakeFSErrorHandling {
+ LakeFSStorageClient.initiatePresignedMultipartUploads(
repositoryName,
filePath,
- uploadIdStr,
- physicalAddr
- )
- throw new WebApplicationException(
- "Upload already in progress for this filePath",
- Response.Status.CONFLICT
+ computedNumParts
)
}
- // Pre-create part rows 1..numPartsValue with empty ETag.
- // This makes per-part locking cheap and deterministic.
+ val uploadIdStr = presign.getUploadId
+ val physicalAddr = presign.getPhysicalAddress
+
+ try {
+ val rowsInserted = ctx
+ .insertInto(DATASET_UPLOAD_SESSION)
+ .set(DATASET_UPLOAD_SESSION.FILE_PATH, filePath)
+ .set(DATASET_UPLOAD_SESSION.DID, did)
+ .set(DATASET_UPLOAD_SESSION.UID, uid)
+ .set(DATASET_UPLOAD_SESSION.UPLOAD_ID, uploadIdStr)
+ .set(DATASET_UPLOAD_SESSION.PHYSICAL_ADDRESS, physicalAddr)
+ .set(DATASET_UPLOAD_SESSION.NUM_PARTS_REQUESTED,
Integer.valueOf(computedNumParts))
+ .set(DATASET_UPLOAD_SESSION.FILE_SIZE_BYTES,
java.lang.Long.valueOf(fileSizeBytesValue))
+ .set(DATASET_UPLOAD_SESSION.PART_SIZE_BYTES,
java.lang.Long.valueOf(partSizeBytesValue))
+ .onDuplicateKeyIgnore()
+ .execute()
- val partNumberSeries = DSL.generateSeries(1,
numPartsValue).asTable("gs", "pn")
- val partNumberField = partNumberSeries.field("pn", classOf[Integer])
+ if (rowsInserted == 1) {
+ val partNumberSeries =
+ DSL.generateSeries(1, computedNumParts).asTable("gs",
"partNumberField")
+ val partNumberField = partNumberSeries.field("partNumberField",
classOf[Integer])
- ctx
- .insertInto(
- DATASET_UPLOAD_SESSION_PART,
- DATASET_UPLOAD_SESSION_PART.UPLOAD_ID,
- DATASET_UPLOAD_SESSION_PART.PART_NUMBER,
- DATASET_UPLOAD_SESSION_PART.ETAG
- )
- .select(
ctx
+ .insertInto(
+ DATASET_UPLOAD_SESSION_PART,
+ DATASET_UPLOAD_SESSION_PART.UPLOAD_ID,
+ DATASET_UPLOAD_SESSION_PART.PART_NUMBER,
+ DATASET_UPLOAD_SESSION_PART.ETAG
+ )
.select(
- inl(uploadIdStr),
- partNumberField,
- inl("") // placeholder empty etag
+ ctx
+ .select(
+ inl(uploadIdStr),
+ partNumberField,
+ inl("")
+ )
+ .from(partNumberSeries)
)
- .from(partNumberSeries)
- )
- .execute()
+ .execute()
+
+ session = ctx
+ .selectFrom(DATASET_UPLOAD_SESSION)
+ .where(
+ DATASET_UPLOAD_SESSION.UID
+ .eq(uid)
+ .and(DATASET_UPLOAD_SESSION.DID.eq(did))
+ .and(DATASET_UPLOAD_SESSION.FILE_PATH.eq(filePath))
+ )
+ .fetchOne()
+ } else {
+ try {
+ LakeFSStorageClient.abortPresignedMultipartUploads(
+ repositoryName,
+ filePath,
+ uploadIdStr,
+ physicalAddr
+ )
+ } catch { case _: Throwable => () }
+
+ session = ctx
+ .selectFrom(DATASET_UPLOAD_SESSION)
+ .where(
+ DATASET_UPLOAD_SESSION.UID
+ .eq(uid)
+ .and(DATASET_UPLOAD_SESSION.DID.eq(did))
+ .and(DATASET_UPLOAD_SESSION.FILE_PATH.eq(filePath))
+ )
+ .fetchOne()
+ }
+ } catch {
+ case e: Exception =>
+ try {
+ LakeFSStorageClient.abortPresignedMultipartUploads(
+ repositoryName,
+ filePath,
+ uploadIdStr,
+ physicalAddr
+ )
+ } catch { case _: Throwable => () }
+ throw e
+ }
+ }
- Response.ok().build()
- } catch {
- case e: Exception =>
+ if (session == null) {
+ throw new WebApplicationException(
+ "Failed to create or locate upload session",
+ Response.Status.INTERNAL_SERVER_ERROR
+ )
+ }
+
+ val dbNumParts = session.getNumPartsRequested
+
+ val uploadId = session.getUploadId
+ val nParts = dbNumParts
+
+ // CHANGED: lock rows with NOWAIT; if any row is locked by another
uploader -> 409
+ if (rows == null) {
+ rows =
try {
- LakeFSStorageClient.abortPresignedMultipartUploads(
- repositoryName,
- filePath,
- uploadIdStr,
- physicalAddr
- )
- } catch { case _: Throwable => () }
- throw e
+ ctx
+ .select(DATASET_UPLOAD_SESSION_PART.PART_NUMBER,
DATASET_UPLOAD_SESSION_PART.ETAG)
+ .from(DATASET_UPLOAD_SESSION_PART)
+ .where(DATASET_UPLOAD_SESSION_PART.UPLOAD_ID.eq(uploadId))
+ .forUpdate()
+ .noWait()
+ .fetch()
+ } catch {
+ case e: DataAccessException
+ if Option(e.getCause)
+ .collect { case s: SQLException => s.getSQLState }
+ .contains("55P03") =>
+ throw new WebApplicationException(
+ "Another client is uploading parts for this file",
+ Response.Status.CONFLICT
+ )
+ }
}
+
+ // CHANGED: compute missingParts + completedPartsCount from the SAME
query result
+ val missingParts = rows.asScala
+ .filter(r =>
+
Option(r.get(DATASET_UPLOAD_SESSION_PART.ETAG)).map(_.trim).getOrElse("").isEmpty
+ )
+ .map(r => r.get(DATASET_UPLOAD_SESSION_PART.PART_NUMBER).intValue())
+ .toList
+
+ val completedPartsCount = nParts - missingParts.size
+
+ Response
+ .ok(
+ Map(
+ "missingParts" -> missingParts.asJava,
+ "completedPartsCount" -> Integer.valueOf(completedPartsCount)
+ )
+ )
+ .build()
}
}
diff --git
a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
index 0d37298e9e..c03a6d4cb6 100644
---
a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
+++
b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
@@ -32,6 +32,7 @@ import
org.apache.texera.dao.jooq.generated.tables.DatasetUploadSessionPart.DATA
import org.apache.texera.dao.jooq.generated.tables.daos.{DatasetDao,
DatasetVersionDao, UserDao}
import org.apache.texera.dao.jooq.generated.tables.pojos.{Dataset,
DatasetVersion, User}
import org.apache.texera.service.MockLakeFS
+import org.apache.texera.service.util.S3StorageClient
import org.jooq.SQLDialect
import org.jooq.impl.DSL
import org.scalatest.flatspec.AnyFlatSpec
@@ -62,6 +63,51 @@ class DatasetResourceSpec
with BeforeAndAfterAll
with BeforeAndAfterEach {
+ // ---------- Response entity helpers ----------
+ private def entityAsScalaMap(resp: Response): Map[String, Any] = {
+ resp.getEntity match {
+ case m: scala.collection.Map[_, _] =>
+ m.asInstanceOf[scala.collection.Map[String, Any]].toMap
+ case m: java.util.Map[_, _] =>
+ m.asScala.toMap.asInstanceOf[Map[String, Any]]
+ case null => Map.empty
+ case other =>
+ fail(s"Unexpected response entity type: ${other.getClass}")
+ }
+ }
+
+ private def mapListOfInts(x: Any): List[Int] =
+ x match {
+ case l: java.util.List[_] => l.asScala.map(_.toString.toInt).toList
+ case l: scala.collection.Seq[_] => l.map(_.toString.toInt).toList
+ case other => fail(s"Expected list, got:
${other.getClass}")
+ }
+
+ private def mapListOfStrings(x: Any): List[String] =
+ x match {
+ case l: java.util.List[_] => l.asScala.map(_.toString).toList
+ case l: scala.collection.Seq[_] => l.map(_.toString).toList
+ case other => fail(s"Expected list, got:
${other.getClass}")
+ }
+
+ private def listUploads(
+ user: SessionUser = multipartOwnerSessionUser
+ ): List[String] = {
+ val resp = datasetResource.multipartUpload(
+ "list",
+ ownerUser.getEmail,
+ multipartDataset.getName,
+ urlEnc("ignored"),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ user
+ )
+ resp.getStatus shouldEqual 200
+ val m = entityAsScalaMap(resp)
+ mapListOfStrings(m("filePaths"))
+ }
+
// ---------- logging (multipart tests can be noisy) ----------
private var savedLevels: Map[String, Level] = Map.empty
@@ -473,7 +519,8 @@ class DatasetResourceSpec
numParts: Int,
lastPartBytes: Int = 1,
partSizeBytes: Int = MinNonFinalPartBytes,
- user: SessionUser = multipartOwnerSessionUser
+ user: SessionUser = multipartOwnerSessionUser,
+ restart: Optional[java.lang.Boolean] = Optional.empty()
): Response = {
require(numParts >= 1, "numParts must be >= 1")
require(lastPartBytes > 0, "lastPartBytes must be > 0")
@@ -499,6 +546,24 @@ class DatasetResourceSpec
urlEnc(filePath),
Optional.of(java.lang.Long.valueOf(fileSizeBytes)),
Optional.of(java.lang.Long.valueOf(maxPartSizeBytes)),
+ restart,
+ user
+ )
+ }
+ private def initRaw(
+ filePath: String,
+ fileSizeBytes: Long,
+ partSizeBytes: Long,
+ user: SessionUser = multipartOwnerSessionUser
+ ): Response = {
+ datasetResource.multipartUpload(
+ "init",
+ ownerUser.getEmail,
+ multipartDataset.getName,
+ urlEnc(filePath),
+ Optional.of(java.lang.Long.valueOf(fileSizeBytes)),
+ Optional.of(java.lang.Long.valueOf(partSizeBytes)),
+ Optional.empty(),
user
)
}
@@ -514,6 +579,7 @@ class DatasetResourceSpec
urlEnc(filePath),
Optional.empty(),
Optional.empty(),
+ Optional.empty(),
user
)
@@ -528,6 +594,7 @@ class DatasetResourceSpec
urlEnc(filePath),
Optional.empty(),
Optional.empty(),
+ Optional.empty(),
user
)
@@ -617,6 +684,74 @@ class DatasetResourceSpec
private def assertStatus(ex: WebApplicationException, status: Int): Unit =
ex.getResponse.getStatus shouldEqual status
+ //
---------------------------------------------------------------------------
+ // LIST TESTS (type=list)
+ //
---------------------------------------------------------------------------
+ "multipart-upload?type=list" should "return empty when no active sessions
exist for the dataset" in {
+ // Make deterministic: remove any leftover sessions from other tests.
+ getDSLContext
+ .deleteFrom(DATASET_UPLOAD_SESSION)
+ .where(DATASET_UPLOAD_SESSION.DID.eq(multipartDataset.getDid))
+ .execute()
+
+ listUploads() shouldBe empty
+ }
+
+ it should "reject list when caller lacks WRITE access" in {
+ val ex = intercept[ForbiddenException] {
+ listUploads(user = multipartNoWriteSessionUser)
+ }
+ ex.getResponse.getStatus shouldEqual 403
+ }
+
+ it should "return only non-expired sessions, sorted by filePath (and exclude
expired ones)" in {
+ // Clean slate
+ getDSLContext
+ .deleteFrom(DATASET_UPLOAD_SESSION)
+ .where(DATASET_UPLOAD_SESSION.DID.eq(multipartDataset.getDid))
+ .execute()
+
+ val fpA = uniqueFilePath("list-a")
+ val fpB = uniqueFilePath("list-b")
+
+ initUpload(fpB, numParts = 2).getStatus shouldEqual 200
+ initUpload(fpA, numParts = 2).getStatus shouldEqual 200
+
+ // Expire fpB by pushing created_at back > 6 hours.
+ val uploadIdB = fetchUploadIdOrFail(fpB)
+ val tableName = DATASET_UPLOAD_SESSION.getName // typically
"dataset_upload_session"
+ getDSLContext
+ .update(DATASET_UPLOAD_SESSION)
+ .set(
+ DATASET_UPLOAD_SESSION.CREATED_AT,
+ DSL.field("current_timestamp - interval '7
hours'").cast(classOf[java.time.OffsetDateTime])
+ )
+ .where(DATASET_UPLOAD_SESSION.UPLOAD_ID.eq(uploadIdB))
+ .execute()
+
+ val listed = listUploads()
+ listed shouldEqual listed.sorted
+ listed should contain(fpA)
+ listed should not contain fpB
+ }
+
+ it should "not list sessions after abort (cleanup works end-to-end)" in {
+ // Clean slate
+ getDSLContext
+ .deleteFrom(DATASET_UPLOAD_SESSION)
+ .where(DATASET_UPLOAD_SESSION.DID.eq(multipartDataset.getDid))
+ .execute()
+
+ val fp = uniqueFilePath("list-after-abort")
+ initUpload(fp, numParts = 2).getStatus shouldEqual 200
+
+ listUploads() should contain(fp)
+
+ abortUpload(fp).getStatus shouldEqual 200
+
+ listUploads() should not contain fp
+ }
+
//
---------------------------------------------------------------------------
// INIT TESTS
//
---------------------------------------------------------------------------
@@ -634,6 +769,345 @@ class DatasetResourceSpec
assertPlaceholdersCreated(sessionRecord.getUploadId, expectedParts = 3)
}
+ it should "restart session when restart=true is explicitly requested (even
if config is unchanged) and reset progress" in {
+ val filePath = uniqueFilePath("init-restart-true")
+
+ // Initial init
+ initUpload(filePath, numParts = 2, lastPartBytes = 123).getStatus
shouldEqual 200
+ val oldUploadId = fetchUploadIdOrFail(filePath)
+
+ // Make progress in old session
+ uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200
+ fetchPartRows(oldUploadId).find(_.getPartNumber == 1).get.getEtag.trim
should not be ""
+
+ // Re-init with same config but restart=true => must restart
+ val r2 = initUpload(
+ filePath,
+ numParts = 2,
+ lastPartBytes = 123,
+ restart = Optional.of(java.lang.Boolean.TRUE)
+ )
+ r2.getStatus shouldEqual 200
+
+ val newUploadId = fetchUploadIdOrFail(filePath)
+ newUploadId should not equal oldUploadId
+
+ // Old part rows gone, new placeholders empty
+ fetchPartRows(oldUploadId) shouldBe empty
+ assertPlaceholdersCreated(newUploadId, expectedParts = 2)
+
+ // Response should look like a fresh session
+ val m = entityAsScalaMap(r2)
+ mapListOfInts(m("missingParts")) shouldEqual List(1, 2)
+ m("completedPartsCount").toString.toInt shouldEqual 0
+ }
+
+ it should "not restart session when restart=false (same config) and preserve
uploadId + progress" in {
+ val filePath = uniqueFilePath("init-restart-false")
+
+ initUpload(filePath, numParts = 3, lastPartBytes = 123).getStatus
shouldEqual 200
+ val uploadId1 = fetchUploadIdOrFail(filePath)
+
+ uploadPart(filePath, 1, minPartBytes(7.toByte)).getStatus shouldEqual 200
+
+ val r2 = initUpload(
+ filePath,
+ numParts = 3,
+ lastPartBytes = 123,
+ restart = Optional.of(java.lang.Boolean.FALSE)
+ )
+ r2.getStatus shouldEqual 200
+
+ val uploadId2 = fetchUploadIdOrFail(filePath)
+ uploadId2 shouldEqual uploadId1
+
+ val m = entityAsScalaMap(r2)
+ mapListOfInts(m("missingParts")) shouldEqual List(2, 3)
+ m("completedPartsCount").toString.toInt shouldEqual 1
+ }
+
+ it should "restart even when all parts were already uploaded (restart=true
makes missingParts full again)" in {
+ val filePath = uniqueFilePath("init-restart-after-all-parts")
+
+ initUpload(filePath, numParts = 2, lastPartBytes = 123).getStatus
shouldEqual 200
+ val oldUploadId = fetchUploadIdOrFail(filePath)
+
+ // Upload everything (but don't finish)
+ uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200
+ uploadPart(filePath, 2, tinyBytes(2.toByte, n = 123)).getStatus
shouldEqual 200
+
+ // Confirm "all done" without restart
+ val rNoRestart = initUpload(filePath, numParts = 2, lastPartBytes = 123)
+ rNoRestart.getStatus shouldEqual 200
+ val mNoRestart = entityAsScalaMap(rNoRestart)
+ mapListOfInts(mNoRestart("missingParts")) shouldEqual Nil
+ mNoRestart("completedPartsCount").toString.toInt shouldEqual 2
+
+ // Now force restart => must reset
+ val rRestart = initUpload(
+ filePath,
+ numParts = 2,
+ lastPartBytes = 123,
+ restart = Optional.of(java.lang.Boolean.TRUE)
+ )
+ rRestart.getStatus shouldEqual 200
+
+ val newUploadId = fetchUploadIdOrFail(filePath)
+ newUploadId should not equal oldUploadId
+ fetchPartRows(oldUploadId) shouldBe empty
+ assertPlaceholdersCreated(newUploadId, expectedParts = 2)
+
+ val m = entityAsScalaMap(rRestart)
+ mapListOfInts(m("missingParts")) shouldEqual List(1, 2)
+ m("completedPartsCount").toString.toInt shouldEqual 0
+ }
+
+ "multipart-upload?type=init" should "restart session when init config
changes (fileSize/partSize/numParts) and recreate placeholders" in {
+ val filePath = uniqueFilePath("init-conflict-restart")
+
+ // First init => 2 parts
+ initUpload(filePath, numParts = 2, lastPartBytes = 123).getStatus
shouldEqual 200
+ val oldUploadId = fetchUploadIdOrFail(filePath)
+
+ // Upload part 1 so old session isn't empty
+ uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200
+ fetchPartRows(oldUploadId).find(_.getPartNumber == 1).get.getEtag.trim
should not be ""
+
+ // Second init with DIFFERENT config => 3 parts now
+ val resp2 = initUpload(filePath, numParts = 3, lastPartBytes = 50)
+ resp2.getStatus shouldEqual 200
+
+ val newUploadId = fetchUploadIdOrFail(filePath)
+ newUploadId should not equal oldUploadId
+
+ // Old part rows should have been deleted via ON DELETE CASCADE
+ fetchPartRows(oldUploadId) shouldBe empty
+
+ // New placeholders should exist and be empty
+ assertPlaceholdersCreated(newUploadId, expectedParts = 3)
+
+ val m2 = entityAsScalaMap(resp2)
+ mapListOfInts(m2("missingParts")) shouldEqual List(1, 2, 3)
+ m2("completedPartsCount").toString.toInt shouldEqual 0
+ }
+
+ it should "restart session when physicalAddress has expired (created_at too
old), even if config is unchanged" in {
+ val filePath = uniqueFilePath("init-expired-restart")
+
+ // First init (2 parts)
+ val r1 = initUpload(filePath, numParts = 2, lastPartBytes = 123)
+ r1.getStatus shouldEqual 200
+
+ val oldUploadId = fetchUploadIdOrFail(filePath)
+ oldUploadId should not be null
+
+ // Optional: create some progress so we know it truly resets
+ uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200
+ fetchPartRows(oldUploadId).find(_.getPartNumber == 1).get.getEtag.trim
should not be ""
+
+ // Age the session so it is definitely expired (>
PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS = 6)
+ val expireHrs = S3StorageClient.PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS
+
+ getDSLContext
+ .update(DATASET_UPLOAD_SESSION)
+ .set(
+ DATASET_UPLOAD_SESSION.CREATED_AT,
+ DSL
+ .field(s"current_timestamp - interval '${expireHrs + 1} hours'")
+ .cast(classOf[java.time.OffsetDateTime])
+ )
+ .where(DATASET_UPLOAD_SESSION.UPLOAD_ID.eq(oldUploadId))
+ .execute()
+
+ // Same init config again -> should restart because it's expired
+ val r2 = initUpload(filePath, numParts = 2, lastPartBytes = 123)
+ r2.getStatus shouldEqual 200
+
+ val newUploadId = fetchUploadIdOrFail(filePath)
+ newUploadId should not equal oldUploadId
+
+ // Old part rows should have been deleted (ON DELETE CASCADE)
+ fetchPartRows(oldUploadId) shouldBe empty
+
+ // New placeholders should exist, empty
+ assertPlaceholdersCreated(newUploadId, expectedParts = 2)
+
+ // Response should reflect a fresh session
+ val m2 = entityAsScalaMap(r2)
+ mapListOfInts(m2("missingParts")) shouldEqual List(1, 2)
+ m2("completedPartsCount").toString.toInt shouldEqual 0
+ }
+
+ it should "be resumable: repeated init with same config keeps uploadId and
returns missingParts + completedPartsCount" in {
+ val filePath = uniqueFilePath("init-resume-same-config")
+
+ val resp1 = initUpload(filePath, numParts = 3, lastPartBytes = 123)
+ resp1.getStatus shouldEqual 200
+ val uploadId1 = fetchUploadIdOrFail(filePath)
+
+ uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200
+
+ val resp2 = initUpload(filePath, numParts = 3, lastPartBytes = 123)
+ resp2.getStatus shouldEqual 200
+ val uploadId2 = fetchUploadIdOrFail(filePath)
+
+ uploadId2 shouldEqual uploadId1
+
+ val m2 = entityAsScalaMap(resp2)
+ val missing = mapListOfInts(m2("missingParts"))
+ missing shouldEqual List(2, 3)
+ m2("completedPartsCount").toString.toInt shouldEqual 1
+ }
+ it should "return missingParts=[] when all parts are already uploaded
(completedPartsCount == numParts)" in {
+ val filePath = uniqueFilePath("init-all-done")
+ initUpload(filePath, numParts = 2, lastPartBytes = 123).getStatus
shouldEqual 200
+
+ uploadPart(filePath, 1, minPartBytes(7.toByte)).getStatus shouldEqual 200
+ uploadPart(filePath, 2, tinyBytes(8.toByte, n = 123)).getStatus
shouldEqual 200
+
+ val resp2 = initUpload(filePath, numParts = 2, lastPartBytes = 123)
+ resp2.getStatus shouldEqual 200
+
+ val m2 = entityAsScalaMap(resp2)
+ mapListOfInts(m2("missingParts")) shouldEqual Nil
+ m2("completedPartsCount").toString.toInt shouldEqual 2
+ }
+ it should "return 409 CONFLICT if the upload session row is locked by
another transaction" in {
+ val filePath = uniqueFilePath("init-session-row-locked")
+ initUpload(filePath, numParts = 2).getStatus shouldEqual 200
+
+ val connectionProvider = getDSLContext.configuration().connectionProvider()
+ val connection = connectionProvider.acquire()
+ connection.setAutoCommit(false)
+
+ try {
+ val locking = DSL.using(connection, SQLDialect.POSTGRES)
+ locking
+ .selectFrom(DATASET_UPLOAD_SESSION)
+ .where(
+ DATASET_UPLOAD_SESSION.UID
+ .eq(ownerUser.getUid)
+ .and(DATASET_UPLOAD_SESSION.DID.eq(multipartDataset.getDid))
+ .and(DATASET_UPLOAD_SESSION.FILE_PATH.eq(filePath))
+ )
+ .forUpdate()
+ .fetchOne()
+
+ val ex = intercept[WebApplicationException] {
+ initUpload(filePath, numParts = 2)
+ }
+ ex.getResponse.getStatus shouldEqual 409
+ } finally {
+ connection.rollback()
+ connectionProvider.release(connection)
+ }
+
+ // lock released => init works again
+ initUpload(filePath, numParts = 2).getStatus shouldEqual 200
+ }
+ it should "treat normalized-equivalent paths as the same session (no
duplicate sessions)" in {
+ val base = s"norm-${System.nanoTime()}.bin"
+ val raw = s"a/../$base" // normalizes to base
+
+ // init using traversal-ish but normalizable path
+ initUpload(raw, numParts = 1, lastPartBytes = 16, partSizeBytes =
16).getStatus shouldEqual 200
+ val uploadId1 = fetchUploadIdOrFail(base) // stored path should be
normalized
+
+ // init using normalized path should hit the same session (resume)
+ val resp2 = initUpload(base, numParts = 1, lastPartBytes = 16,
partSizeBytes = 16)
+ resp2.getStatus shouldEqual 200
+ val uploadId2 = fetchUploadIdOrFail(base)
+
+ uploadId2 shouldEqual uploadId1
+
+ val m2 = entityAsScalaMap(resp2)
+ mapListOfInts(m2("missingParts")) shouldEqual List(1)
+ m2("completedPartsCount").toString.toInt shouldEqual 0
+ }
+ it should "restart session when fileSizeBytes differs (single-part;
computedNumParts unchanged)" in {
+ val filePath = uniqueFilePath("init-conflict-filesize")
+
+ val declared = 16
+ val r1 = initRaw(filePath, fileSizeBytes = declared, partSizeBytes = 32L)
// numParts=1
+ r1.getStatus shouldEqual 200
+ val oldUploadId = fetchUploadIdOrFail(filePath)
+
+ // Add progress in old session
+ uploadPart(filePath, 1, Array.fill[Byte](declared)(1.toByte)).getStatus
shouldEqual 200
+
+ fetchPartRows(oldUploadId).find(_.getPartNumber == 1).get.getEtag.trim
should not be ""
+
+ val r2 = initRaw(filePath, fileSizeBytes = 17L, partSizeBytes = 32L) //
numParts=1 still
+ r2.getStatus shouldEqual 200
+ val newUploadId = fetchUploadIdOrFail(filePath)
+
+ newUploadId should not equal oldUploadId
+ fetchPartRows(oldUploadId) shouldBe empty // old placeholders removed
+
+ val session = fetchSession(filePath)
+ session.getFileSizeBytes shouldEqual 17L
+ session.getPartSizeBytes shouldEqual 32L
+ session.getNumPartsRequested shouldEqual 1
+
+ val m = entityAsScalaMap(r2)
+ mapListOfInts(m("missingParts")) shouldEqual List(1)
+ m("completedPartsCount").toString.toInt shouldEqual 0 // progress reset
+ }
+
+ it should "restart session when partSizeBytes differs (single-part;
computedNumParts unchanged)" in {
+ val filePath = uniqueFilePath("init-conflict-partsize")
+
+ initRaw(filePath, fileSizeBytes = 16L, partSizeBytes = 32L).getStatus
shouldEqual 200
+ val oldUploadId = fetchUploadIdOrFail(filePath)
+
+ // Second init, same fileSize, different partSize, still 1 part
+ val r2 = initRaw(filePath, fileSizeBytes = 16L, partSizeBytes = 64L)
+ r2.getStatus shouldEqual 200
+ val newUploadId = fetchUploadIdOrFail(filePath)
+
+ newUploadId should not equal oldUploadId
+ fetchPartRows(oldUploadId) shouldBe empty
+
+ val session = fetchSession(filePath)
+ session.getFileSizeBytes shouldEqual 16L
+ session.getPartSizeBytes shouldEqual 64L
+ session.getNumPartsRequested shouldEqual 1
+
+ val m = entityAsScalaMap(r2)
+ mapListOfInts(m("missingParts")) shouldEqual List(1)
+ m("completedPartsCount").toString.toInt shouldEqual 0
+ }
+ it should "restart session when computed numParts differs (multipart ->
single-part)" in {
+ val filePath = uniqueFilePath("init-conflict-numparts")
+
+ val partSize = MinNonFinalPartBytes.toLong // 5 MiB
+ val fileSize = partSize * 2L + 123L // => computedNumParts = 3
+
+ val r1 = initRaw(filePath, fileSizeBytes = fileSize, partSizeBytes =
partSize)
+ r1.getStatus shouldEqual 200
+ val oldUploadId = fetchUploadIdOrFail(filePath)
+ fetchSession(filePath).getNumPartsRequested shouldEqual 3
+
+ // Create progress
+ uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200
+
+ // Re-init with a partSize >= fileSize => computedNumParts becomes 1
+ val r2 = initRaw(filePath, fileSizeBytes = fileSize, partSizeBytes =
fileSize)
+ r2.getStatus shouldEqual 200
+ val newUploadId = fetchUploadIdOrFail(filePath)
+
+ newUploadId should not equal oldUploadId
+ fetchPartRows(oldUploadId) shouldBe empty
+
+ val session = fetchSession(filePath)
+ session.getNumPartsRequested shouldEqual 1
+ session.getFileSizeBytes shouldEqual fileSize
+ session.getPartSizeBytes shouldEqual fileSize
+
+ val m = entityAsScalaMap(r2)
+ mapListOfInts(m("missingParts")) shouldEqual List(1)
+ m("completedPartsCount").toString.toInt shouldEqual 0
+ }
it should "reject missing fileSizeBytes / partSizeBytes" in {
val filePath1 = uniqueFilePath("init-missing-filesize")
@@ -645,6 +1119,7 @@ class DatasetResourceSpec
urlEnc(filePath1),
Optional.empty(),
Optional.of(java.lang.Long.valueOf(MinNonFinalPartBytes.toLong)),
+ Optional.empty(),
multipartOwnerSessionUser
)
}
@@ -659,6 +1134,7 @@ class DatasetResourceSpec
urlEnc(filePath2),
Optional.of(java.lang.Long.valueOf(1L)),
Optional.empty(),
+ Optional.empty(),
multipartOwnerSessionUser
)
}
@@ -677,6 +1153,7 @@ class DatasetResourceSpec
urlEnc(filePath),
Optional.of(java.lang.Long.valueOf(0L)),
Optional.of(java.lang.Long.valueOf(1L)),
+ Optional.empty(),
multipartOwnerSessionUser
)
},
@@ -692,6 +1169,7 @@ class DatasetResourceSpec
urlEnc(filePath),
Optional.of(java.lang.Long.valueOf(1L)),
Optional.of(java.lang.Long.valueOf(0L)),
+ Optional.empty(),
multipartOwnerSessionUser
)
},
@@ -715,6 +1193,7 @@ class DatasetResourceSpec
urlEnc(filePathOver),
Optional.of(java.lang.Long.valueOf(oneMiB + 1L)),
Optional.of(java.lang.Long.valueOf(oneMiB + 1L)), // single-part
+ Optional.empty(),
multipartOwnerSessionUser
)
},
@@ -730,6 +1209,7 @@ class DatasetResourceSpec
urlEnc(filePathEq),
Optional.of(java.lang.Long.valueOf(oneMiB)),
Optional.of(java.lang.Long.valueOf(oneMiB)), // single-part
+ Optional.empty(),
multipartOwnerSessionUser
)
@@ -752,6 +1232,7 @@ class DatasetResourceSpec
urlEnc(filePathEq),
Optional.of(java.lang.Long.valueOf(max6MiB)),
Optional.of(java.lang.Long.valueOf(partSize)),
+ Optional.empty(),
multipartOwnerSessionUser
)
@@ -768,6 +1249,7 @@ class DatasetResourceSpec
urlEnc(filePathOver),
Optional.of(java.lang.Long.valueOf(max6MiB + 1L)),
Optional.of(java.lang.Long.valueOf(partSize)),
+ Optional.empty(),
multipartOwnerSessionUser
)
},
@@ -790,6 +1272,7 @@ class DatasetResourceSpec
urlEnc(filePath),
Optional.of(java.lang.Long.valueOf(totalMaxBytes)),
Optional.of(java.lang.Long.valueOf(MinNonFinalPartBytes.toLong)),
+ Optional.empty(),
multipartOwnerSessionUser
)
}
@@ -821,6 +1304,7 @@ class DatasetResourceSpec
urlEnc(filePath),
Optional.empty(),
Optional.empty(),
+ Optional.empty(),
multipartOwnerSessionUser
)
}
@@ -835,7 +1319,7 @@ class DatasetResourceSpec
assertStatus(ex, 403)
}
- it should "handle init race: exactly one succeeds, one gets 409 CONFLICT" in
{
+ it should "handle init race: concurrent init calls converge to a single
session (both return 200)" in {
val filePath = uniqueFilePath("init-race")
val barrier = new CyclicBarrier(2)
@@ -851,31 +1335,61 @@ class DatasetResourceSpec
val future2 = Future(callInit())
val results = Await.result(Future.sequence(Seq(future1, future2)),
30.seconds)
- val oks = results.collect { case Right(r) if r.getStatus == 200 => r }
+ // No unexpected failures
val fails = results.collect { case Left(t) => t }
-
- oks.size shouldEqual 1
- fails.size shouldEqual 1
-
- fails.head match {
- case e: WebApplicationException => assertStatus(e, 409)
- case other =>
- fail(
- s"Expected WebApplicationException(CONFLICT), got: ${other.getClass}
/ ${other.getMessage}"
- )
+ withClue(s"init race failures: ${fails.map(_.getMessage).mkString(", ")}")
{
+ fails shouldBe empty
}
+ // Both should be OK
+ val oks = results.collect { case Right(r) => r }
+ oks.size shouldEqual 2
+ oks.foreach(_.getStatus shouldEqual 200)
+
+ // Exactly one session row exists for this file path
val sessionRecord = fetchSession(filePath)
sessionRecord should not be null
+
+ // Placeholders created for expected parts
assertPlaceholdersCreated(sessionRecord.getUploadId, expectedParts = 2)
+
+ //Both responses should report missingParts [1,2] and completedPartsCount 0
+ oks.foreach { r =>
+ val m = entityAsScalaMap(r)
+ mapListOfInts(m("missingParts")) shouldEqual List(1, 2)
+ m("completedPartsCount").toString.toInt shouldEqual 0
+ }
}
- it should "reject sequential double init with 409 CONFLICT" in {
- val filePath = uniqueFilePath("init-double")
+ it should "return 409 if init cannot acquire the session row lock (NOWAIT)"
in {
+ val filePath = uniqueFilePath("init-lock-409")
initUpload(filePath, numParts = 2).getStatus shouldEqual 200
- val ex = intercept[WebApplicationException] { initUpload(filePath,
numParts = 2) }
- assertStatus(ex, 409)
+ val connectionProvider = getDSLContext.configuration().connectionProvider()
+ val connection = connectionProvider.acquire()
+ connection.setAutoCommit(false)
+
+ try {
+ val locking = DSL.using(connection, SQLDialect.POSTGRES)
+ locking
+ .selectFrom(DATASET_UPLOAD_SESSION)
+ .where(
+ DATASET_UPLOAD_SESSION.UID
+ .eq(ownerUser.getUid)
+ .and(DATASET_UPLOAD_SESSION.DID.eq(multipartDataset.getDid))
+ .and(DATASET_UPLOAD_SESSION.FILE_PATH.eq(filePath))
+ )
+ .forUpdate()
+ .fetchOne()
+
+ val ex = intercept[WebApplicationException] {
+ initUpload(filePath, numParts = 2)
+ }
+ ex.getResponse.getStatus shouldEqual 409
+ } finally {
+ connection.rollback()
+ connectionProvider.release(connection)
+ }
}
//
---------------------------------------------------------------------------
@@ -1160,6 +1674,38 @@ class DatasetResourceSpec
assertStatus(ex, 403)
}
+ "multipart-upload/part" should "treat retries as idempotent once ETag is set
(no overwrite on second call)" in {
+ val filePath = uniqueFilePath("part-idempotent")
+ initUpload(
+ filePath,
+ numParts = 1,
+ lastPartBytes = 16,
+ partSizeBytes = 16
+ ).getStatus shouldEqual 200
+
+ val uploadId = fetchUploadIdOrFail(filePath)
+
+ val n = 16
+ val bytes1: Array[Byte] = Array.tabulate[Byte](n)(i => (i + 1).toByte)
+ val bytes2: Array[Byte] = Array.tabulate[Byte](n)(i => (i + 1).toByte)
+
+ uploadPart(filePath, 1, bytes1).getStatus shouldEqual 200
+ val etag1 = fetchPartRows(uploadId).find(_.getPartNumber == 1).get.getEtag
+
+ uploadPart(filePath, 1, bytes2).getStatus shouldEqual 200
+ val etag2 = fetchPartRows(uploadId).find(_.getPartNumber == 1).get.getEtag
+
+ etag2 shouldEqual etag1
+
+ finishUpload(filePath).getStatus shouldEqual 200
+
+ val repoName = multipartDataset.getRepositoryName
+ val downloaded = LakeFSStorageClient.getFileFromRepo(repoName, "main",
filePath)
+ val gotBytes = Files.readAllBytes(Paths.get(downloaded.toURI))
+
+ gotBytes.toSeq shouldEqual bytes1.toSeq
+ }
+
//
---------------------------------------------------------------------------
// FINISH TESTS
//
---------------------------------------------------------------------------
@@ -1183,6 +1729,7 @@ class DatasetResourceSpec
urlEnc(filePath),
Optional.of(java.lang.Long.valueOf(twoMiB)),
Optional.of(java.lang.Long.valueOf(twoMiB)),
+ Optional.empty(),
multipartOwnerSessionUser
)
.getStatus shouldEqual 200
diff --git a/frontend/src/app/app.module.ts b/frontend/src/app/app.module.ts
index 9ddf4bbcc2..a591207f60 100644
--- a/frontend/src/app/app.module.ts
+++ b/frontend/src/app/app.module.ts
@@ -139,6 +139,7 @@ import { OverlayModule } from "@angular/cdk/overlay";
import { HighlightSearchTermsPipe } from
"./dashboard/component/user/user-workflow/user-workflow-list-item/highlight-search-terms.pipe";
import { en_US, provideNzI18n } from "ng-zorro-antd/i18n";
import { FilesUploaderComponent } from
"./dashboard/component/user/files-uploader/files-uploader.component";
+import { ConflictingFileModalContentComponent } from
"./dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component";
import { UserDatasetComponent } from
"./dashboard/component/user/user-dataset/user-dataset.component";
import { UserDatasetVersionCreatorComponent } from
"./dashboard/component/user/user-dataset/user-dataset-explorer/user-dataset-version-creator/user-dataset-version-creator.component";
import { DatasetDetailComponent } from
"./dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component";
@@ -236,6 +237,7 @@ registerLocaleData(en);
NgbdModalAddProjectWorkflowComponent,
NgbdModalRemoveProjectWorkflowComponent,
FilesUploaderComponent,
+ ConflictingFileModalContentComponent,
UserDatasetComponent,
UserDatasetVersionCreatorComponent,
DatasetDetailComponent,
diff --git
a/frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.html
b/frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.html
new file mode 100644
index 0000000000..1d5f8b849a
--- /dev/null
+++
b/frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.html
@@ -0,0 +1,35 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<!doctype html>
+<html lang="en">
+ <head>
+ <meta charset="UTF-8" />
+ <title>Title</title>
+ </head>
+ <body>
+ <div>
+ <div><b>File:</b> {{ data.fileName }}</div>
+ <div><b>Path:</b> {{ data.path }}</div>
+ <div><b>Size:</b> {{ data.size }}</div>
+
+ <div class="hint">An upload session already exists for this path.</div>
+ </div>
+ </body>
+</html>
diff --git
a/frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.scss
b/frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.scss
new file mode 100644
index 0000000000..34b713cf44
--- /dev/null
+++
b/frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.scss
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+.hint {
+ margin-top: 8px;
+}
diff --git
a/frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.ts
b/frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.ts
new file mode 100644
index 0000000000..b418929120
--- /dev/null
+++
b/frontend/src/app/dashboard/component/user/files-uploader/conflicting-file-modal-content/conflicting-file-modal-content.component.ts
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { ChangeDetectionStrategy, Component, inject } from "@angular/core";
+import { NZ_MODAL_DATA } from "ng-zorro-antd/modal";
+
+export interface ConflictingFileModalData {
+ fileName: string;
+ path: string;
+ size: string;
+}
+
+@Component({
+ selector: "texera-conflicting-file-modal-content",
+ templateUrl: "./conflicting-file-modal-content.component.html",
+ styleUrls: ["./conflicting-file-modal-content.component.scss"],
+ changeDetection: ChangeDetectionStrategy.OnPush,
+})
+export class ConflictingFileModalContentComponent {
+ readonly data: ConflictingFileModalData = inject(NZ_MODAL_DATA);
+}
diff --git
a/frontend/src/app/dashboard/component/user/files-uploader/files-uploader.component.ts
b/frontend/src/app/dashboard/component/user/files-uploader/files-uploader.component.ts
index 0fbd00a357..216b592ee0 100644
---
a/frontend/src/app/dashboard/component/user/files-uploader/files-uploader.component.ts
+++
b/frontend/src/app/dashboard/component/user/files-uploader/files-uploader.component.ts
@@ -17,13 +17,22 @@
* under the License.
*/
-import { Component, EventEmitter, Input, Output } from "@angular/core";
-import { FileUploadItem } from "../../../type/dashboard-file.interface";
+import { Component, EventEmitter, Host, Input, Optional, Output } from
"@angular/core";
+import { firstValueFrom } from "rxjs";
import { NgxFileDropEntry } from "ngx-file-drop";
+import { NzModalRef, NzModalService } from "ng-zorro-antd/modal";
+import { FileUploadItem } from "../../../type/dashboard-file.interface";
import { DatasetFileNode } from
"../../../../common/type/datasetVersionFileTree";
import { NotificationService } from
"../../../../common/service/notification/notification.service";
import { AdminSettingsService } from
"../../../service/admin/settings/admin-settings.service";
import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy";
+import { DatasetService } from "../../../service/user/dataset/dataset.service";
+import { DatasetDetailComponent } from
"../user-dataset/user-dataset-explorer/dataset-detail.component";
+import { formatSize } from "../../../../common/util/size-formatter.util";
+import {
+ ConflictingFileModalContentComponent,
+ ConflictingFileModalData,
+} from
"./conflicting-file-modal-content/conflicting-file-modal-content.component";
@UntilDestroy()
@Component({
@@ -32,23 +41,23 @@ import { UntilDestroy, untilDestroyed } from
"@ngneat/until-destroy";
styleUrls: ["./files-uploader.component.scss"],
})
export class FilesUploaderComponent {
- @Input()
- showUploadAlert: boolean = false;
+ @Input() showUploadAlert: boolean = false;
- @Output()
- uploadedFiles = new EventEmitter<FileUploadItem[]>();
+ @Output() uploadedFiles = new EventEmitter<FileUploadItem[]>();
newUploadFileTreeNodes: DatasetFileNode[] = [];
fileUploadingFinished: boolean = false;
- // four types: "success", "info", "warning" and "error"
fileUploadBannerType: "error" | "success" | "info" | "warning" = "success";
fileUploadBannerMessage: string = "";
singleFileUploadMaxSizeMiB: number = 20;
constructor(
private notificationService: NotificationService,
- private adminSettingsService: AdminSettingsService
+ private adminSettingsService: AdminSettingsService,
+ private datasetService: DatasetService,
+ @Optional() @Host() private parent: DatasetDetailComponent,
+ private modal: NzModalService
) {
this.adminSettingsService
.getSetting("single_file_upload_max_size_mib")
@@ -56,42 +65,163 @@ export class FilesUploaderComponent {
.subscribe(value => (this.singleFileUploadMaxSizeMiB = parseInt(value)));
}
- hideBanner() {
+ private markForceRestart(item: FileUploadItem): void {
+ // uploader should call backend init with type=forceRestart when this is
set
+ item.restart = true;
+ }
+
+ private askResumeOrSkip(
+ item: FileUploadItem,
+ showForAll: boolean
+ ): Promise<"resume" | "resumeAll" | "restart" | "restartAll"> {
+ return new Promise(resolve => {
+ const fileName = item.name.split("/").pop() || item.name;
+ const sizeStr = formatSize(item.file.size);
+
+ const ref: NzModalRef =
this.modal.create<ConflictingFileModalContentComponent,
ConflictingFileModalData>({
+ nzTitle: "Conflicting File",
+ nzMaskClosable: false,
+ nzClosable: false,
+ nzContent: ConflictingFileModalContentComponent,
+ nzData: {
+ fileName,
+ path: item.name,
+ size: sizeStr,
+ },
+ nzFooter: [
+ ...(showForAll
+ ? [
+ {
+ label: "Restart For All",
+ onClick: () => {
+ resolve("restartAll");
+ ref.destroy();
+ },
+ },
+ {
+ label: "Resume For All",
+ onClick: () => {
+ resolve("resumeAll");
+ ref.destroy();
+ },
+ },
+ ]
+ : []),
+ {
+ label: "Restart",
+ onClick: () => {
+ resolve("restart");
+ ref.destroy();
+ },
+ },
+ {
+ label: "Resume",
+ type: "primary",
+ onClick: () => {
+ resolve("resume");
+ ref.destroy();
+ },
+ },
+ ],
+ });
+ });
+ }
+
+ private async resolveConflicts(items: FileUploadItem[], activePaths:
string[]): Promise<FileUploadItem[]> {
+ const active = new Set(activePaths ?? []);
+ const isConflict = (p: string) => active.has(p) ||
active.has(encodeURIComponent(p));
+
+ const showForAll = items.length > 1;
+
+ let mode: "ask" | "resumeAll" | "restartAll" = "ask";
+ const out: FileUploadItem[] = [];
+
+ await items.reduce<Promise<void>>(async (chain, item) => {
+ await chain;
+
+ if (!isConflict(item.name)) {
+ out.push(item);
+ return;
+ }
+
+ if (mode === "resumeAll") {
+ out.push(item);
+ return;
+ }
+
+ if (mode === "restartAll") {
+ this.markForceRestart(item);
+ out.push(item);
+ return;
+ }
+
+ const choice = await this.askResumeOrSkip(item, showForAll);
+
+ if (choice === "resume") out.push(item);
+
+ if (choice === "resumeAll") {
+ mode = "resumeAll";
+ out.push(item);
+ }
+
+ if (choice === "restart") {
+ this.markForceRestart(item);
+ out.push(item);
+ }
+
+ if (choice === "restartAll") {
+ mode = "restartAll";
+ this.markForceRestart(item);
+ out.push(item);
+ }
+ }, Promise.resolve());
+
+ return out;
+ }
+
+ hideBanner(): void {
this.fileUploadingFinished = false;
}
- showFileUploadBanner(bannerType: "error" | "success" | "info" | "warning",
bannerMessage: string) {
+ showFileUploadBanner(bannerType: "error" | "success" | "info" | "warning",
bannerMessage: string): void {
this.fileUploadingFinished = true;
this.fileUploadBannerType = bannerType;
this.fileUploadBannerMessage = bannerMessage;
}
- public fileDropped(files: NgxFileDropEntry[]) {
- // this promise create the FileEntry from each of the NgxFileDropEntry
- // this filePromises is used to ensure the atomicity of file upload
+ private getOwnerAndName(): { ownerEmail: string; datasetName: string } {
+ return {
+ ownerEmail: this.parent?.ownerEmail ?? "",
+ datasetName: this.parent?.datasetName ?? "",
+ };
+ }
+
+ public fileDropped(files: NgxFileDropEntry[]): void {
const filePromises = files.map(droppedFile => {
return new Promise<FileUploadItem | null>((resolve, reject) => {
if (droppedFile.fileEntry.isFile) {
const fileEntry = droppedFile.fileEntry as FileSystemFileEntry;
- fileEntry.file(file => {
- // Check the file size here
- if (file.size > this.singleFileUploadMaxSizeMiB * 1024 * 1024) {
- // If the file is too large, reject the promise
- this.notificationService.error(
- `File ${file.name}'s size exceeds the maximum limit of
${this.singleFileUploadMaxSizeMiB}MiB.`
- );
- reject(null);
- } else {
- // If the file size is within the limit, proceed
+ fileEntry.file(
+ file => {
+ if (file.size > this.singleFileUploadMaxSizeMiB * 1024 * 1024) {
+ this.notificationService.error(
+ `File ${file.name}'s size exceeds the maximum limit of
${this.singleFileUploadMaxSizeMiB}MiB.`
+ );
+ reject(null);
+ return;
+ }
+
resolve({
- file: file,
+ file,
name: droppedFile.relativePath,
description: "",
uploadProgress: 0,
isUploadingFlag: false,
+ restart: false,
});
- }
- }, reject);
+ },
+ err => reject(err)
+ );
} else {
resolve(null);
}
@@ -99,31 +229,33 @@ export class FilesUploaderComponent {
});
Promise.allSettled(filePromises)
- .then(results => {
- // once all FileUploadItems are created/some of them are rejected,
enter this block
+ .then(async results => {
+ const { ownerEmail, datasetName } = this.getOwnerAndName();
+
+ const activePathsPromise =
+ ownerEmail && datasetName
+ ?
firstValueFrom(this.datasetService.listMultipartUploads(ownerEmail,
datasetName)).catch(() => [])
+ : [];
+
+ const activePaths = await activePathsPromise;
const successfulUploads = results
- .filter((result): result is PromiseFulfilledResult<FileUploadItem |
null> => result.status === "fulfilled")
- .map(result => result.value)
+ .filter((r): r is PromiseFulfilledResult<FileUploadItem | null> =>
r.status === "fulfilled")
+ .map(r_1 => r_1.value)
.filter((item): item is FileUploadItem => item !== null);
-
- if (successfulUploads.length > 0) {
- // successfulUploads.forEach(fileUploadItem => {
- // this.addFileToNewUploadsFileTree(fileUploadItem.name,
fileUploadItem);
- // });
- const successMessage = `${successfulUploads.length}
file${successfulUploads.length > 1 ? "s" : ""} selected successfully!`;
- this.showFileUploadBanner("success", successMessage);
+ const filteredUploads = await this.resolveConflicts(successfulUploads,
activePaths);
+ if (filteredUploads.length > 0) {
+ const msg = `${filteredUploads.length} file${filteredUploads.length
> 1 ? "s" : ""} selected successfully!`;
+ this.showFileUploadBanner("success", msg);
}
-
const failedCount = results.length - successfulUploads.length;
if (failedCount > 0) {
const errorMessage = `${failedCount} file${failedCount > 1 ? "s" :
""} failed to be selected.`;
this.showFileUploadBanner("error", errorMessage);
}
-
- this.uploadedFiles.emit(successfulUploads);
+ this.uploadedFiles.emit(filteredUploads);
})
.catch(error => {
- this.showFileUploadBanner("error", `Unexpected error:
${error.message}`);
+ this.showFileUploadBanner("error", `Unexpected error: ${error?.message
?? error}`);
});
}
}
diff --git
a/frontend/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.ts
b/frontend/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.ts
index 53a3c67391..f7eb492a9c 100644
---
a/frontend/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.ts
+++
b/frontend/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.ts
@@ -103,6 +103,8 @@ export class DatasetDetailComponent implements OnInit {
versionName: string = "";
isCreatingVersion: boolean = false;
+ public activeMultipartFilePaths: string[] = [];
+
// List of upload tasks – each task tracked by its filePath
public uploadTasks: Array<
MultipartUploadProgress & {
@@ -426,7 +428,8 @@ export class DatasetDetailComponent implements OnInit {
file.name,
file.file,
this.chunkSizeMiB * 1024 * 1024,
- this.maxConcurrentChunks
+ this.maxConcurrentChunks,
+ file.restart
)
.pipe(untilDestroyed(this))
.subscribe({
@@ -452,15 +455,24 @@ export class DatasetDetailComponent implements OnInit {
}
}
},
- error: () => {
+ error: (res: unknown) => {
+ const err = res as HttpErrorResponse;
+
+ if (err?.status === HttpStatusCode.Conflict) {
+ this.notificationService.error(
+ "Upload blocked (409). Another upload is likely in
progress for this file (another tab/browser), or the server is finalizing a
previous upload. Please retry in a moment."
+ );
+ } else {
+ this.notificationService.error("Upload failed. Please
retry.");
+ }
// Handle upload error
const taskIndex = this.uploadTasks.findIndex(t => t.filePath
=== file.name);
if (taskIndex !== -1) {
this.uploadTasks[taskIndex] = {
...this.uploadTasks[taskIndex],
- percentage: 100,
- status: "aborted",
+ percentage: this.uploadTasks[taskIndex].percentage ?? 0,
// was 100
+ status: "failed",
};
this.scheduleHide(taskIndex);
}
@@ -591,7 +603,6 @@ export class DatasetDetailComponent implements OnInit {
},
error: (res: unknown) => {
const err = res as HttpErrorResponse;
-
// Already gone, treat as done
if (err.status === 404) {
done();
@@ -612,13 +623,17 @@ export class DatasetDetailComponent implements OnInit {
abortWithRetry(0);
- this.uploadTasks = this.uploadTasks.filter(t => t.filePath !==
task.filePath);
+ const idx = this.uploadTasks.findIndex(t => t.filePath === task.filePath);
+ if (idx !== -1) {
+ this.uploadTasks[idx] = { ...this.uploadTasks[idx], status: "aborted" };
+ this.scheduleHide(idx);
+ }
}
- getUploadStatus(status: "initializing" | "uploading" | "finished" |
"aborted"): "active" | "exception" | "success" {
+ getUploadStatus(status: MultipartUploadProgress["status"]): "active" |
"exception" | "success" {
return status === "uploading" || status === "initializing"
? "active"
- : status === "aborted"
+ : status === "aborted" || status === "failed"
? "exception"
: "success";
}
diff --git a/frontend/src/app/dashboard/service/user/dataset/dataset.service.ts
b/frontend/src/app/dashboard/service/user/dataset/dataset.service.ts
index 16c580be27..386c269da0 100644
--- a/frontend/src/app/dashboard/service/user/dataset/dataset.service.ts
+++ b/frontend/src/app/dashboard/service/user/dataset/dataset.service.ts
@@ -51,7 +51,7 @@ export const DATASET_GET_OWNERS_URL = DATASET_BASE_URL +
"/user-dataset-owners";
export interface MultipartUploadProgress {
filePath: string;
percentage: number;
- status: "initializing" | "uploading" | "finished" | "aborted";
+ status: "initializing" | "uploading" | "finished" | "aborted" | "failed";
uploadSpeed?: number; // bytes per second
estimatedTimeRemaining?: number; // seconds
totalTime?: number; // total seconds taken
@@ -154,7 +154,8 @@ export class DatasetService {
filePath: string,
file: File,
partSize: number,
- concurrencyLimit: number
+ concurrencyLimit: number,
+ restart: boolean
): Observable<MultipartUploadProgress> {
const partCount = Math.ceil(file.size / partSize);
@@ -162,6 +163,7 @@ export class DatasetService {
// Track upload progress (bytes) for each part independently
const partProgress = new Map<number, number>();
+ let baselineUploaded = 0;
// Progress tracking state
let startTime: number | null = null;
const speedSamples: number[] = [];
@@ -193,7 +195,8 @@ export class DatasetService {
}
lastUpdateTime = now;
- const currentSpeed = elapsed > 0 ? totalUploaded / elapsed : 0;
+ const sessionUploaded = Math.max(0, totalUploaded - baselineUploaded);
+ const currentSpeed = elapsed > 0 ? sessionUploaded / elapsed : 0;
speedSamples.push(currentSpeed);
if (speedSamples.length > 5) {
speedSamples.shift();
@@ -232,48 +235,42 @@ export class DatasetService {
.set("datasetName", datasetName)
.set("filePath", encodeURIComponent(filePath))
.set("fileSizeBytes", file.size.toString())
- .set("partSizeBytes", partSize.toString());
+ .set("partSizeBytes", partSize.toString())
+ .set("restart", restart);
- const init$ = this.http.post<{}>(
+ const init$ = this.http.post<{ missingParts: number[];
completedPartsCount: number }>(
`${AppSettings.getApiEndpoint()}/${DATASET_BASE_URL}/multipart-upload`,
{},
{ params: initParams }
);
- const initWithAbortRetry$ = init$.pipe(
- catchError((res: unknown) => {
- const err = res as HttpErrorResponse;
- if (err.status !== 409) {
- return throwError(() => err);
- }
-
- // Init failed because a session already exists. Abort it and retry
init once.
- return this.finalizeMultipartUpload(ownerEmail, datasetName,
filePath, true).pipe(
- // best-effort abort; if abort itself fails, let the re-init decide
- catchError(() => EMPTY),
- switchMap(() => init$)
- );
- })
- );
-
- const subscription = initWithAbortRetry$
+ const subscription = init$
.pipe(
switchMap(initResp => {
- // Notify UI that upload is starting
+ const missingParts = (initResp?.missingParts ?? []).slice();
+ const completedPartsCount = initResp?.completedPartsCount ?? 0;
+
+ const missingBytes = missingParts.reduce((sum, partNumber) => {
+ const start = (partNumber - 1) * partSize;
+ const end = Math.min(start + partSize, file.size);
+ return sum + (end - start);
+ }, 0);
+
+ baselineUploaded = file.size - missingBytes;
+ const baselinePct = partCount > 0 ?
Math.round((completedPartsCount / partCount) * 100) : 0;
+
observer.next({
filePath,
- percentage: 0,
+ percentage: baselinePct,
status: "initializing",
uploadSpeed: 0,
estimatedTimeRemaining: 0,
totalTime: 0,
});
-
// 2. Upload each part to /multipart-upload/part using
XMLHttpRequest
- return from(Array.from({ length: partCount }, (_, i) => i)).pipe(
- mergeMap(index => {
- const partNumber = index + 1;
- const start = index * partSize;
+ return from(missingParts).pipe(
+ mergeMap(partNumber => {
+ const start = (partNumber - 1) * partSize;
const end = Math.min(start + partSize, file.size);
const chunk = file.slice(start, end);
@@ -284,7 +281,7 @@ export class DatasetService {
if (event.lengthComputable) {
partProgress.set(partNumber, event.loaded);
- let totalUploaded = 0;
+ let totalUploaded = baselineUploaded; // CHANGED
partProgress.forEach(bytes => {
totalUploaded += bytes;
});
@@ -306,7 +303,7 @@ export class DatasetService {
// Mark part as fully uploaded
partProgress.set(partNumber, chunk.size);
- let totalUploaded = 0;
+ let totalUploaded = baselineUploaded;
partProgress.forEach(bytes => {
totalUploaded += bytes;
});
@@ -385,7 +382,7 @@ export class DatasetService {
}),
catchError((error: unknown) => {
// On error, compute best-effort percentage from bytes we've
seen
- let totalUploaded = 0;
+ let totalUploaded = baselineUploaded;
partProgress.forEach(bytes => {
totalUploaded += bytes;
});
@@ -394,29 +391,13 @@ export class DatasetService {
observer.next({
filePath,
percentage,
- status: "aborted",
+ status: "failed",
uploadSpeed: 0,
estimatedTimeRemaining: 0,
totalTime: getTotalTime(),
});
- // Abort on backend
- const abortParams = new HttpParams()
- .set("type", "abort")
- .set("ownerEmail", ownerEmail)
- .set("datasetName", datasetName)
- .set("filePath", encodeURIComponent(filePath));
-
- return this.http
- .post(
-
`${AppSettings.getApiEndpoint()}/${DATASET_BASE_URL}/multipart-upload`,
- {},
- { params: abortParams }
- )
- .pipe(
- switchMap(() => throwError(() => error)),
- catchError(() => throwError(() => error))
- );
+ return throwError(() => error);
})
);
})
@@ -429,6 +410,16 @@ export class DatasetService {
});
}
+ public listMultipartUploads(ownerEmail: string, datasetName: string):
Observable<string[]> {
+ const params = new HttpParams().set("type", "list").set("ownerEmail",
ownerEmail).set("datasetName", datasetName);
+
+ return this.http
+ .post<{
+ filePaths: string[];
+
}>(`${AppSettings.getApiEndpoint()}/${DATASET_BASE_URL}/multipart-upload`, {},
{ params })
+ .pipe(map(res => res?.filePaths ?? []));
+ }
+
public finalizeMultipartUpload(
ownerEmail: string,
datasetName: string,
diff --git a/frontend/src/app/dashboard/type/dashboard-file.interface.ts
b/frontend/src/app/dashboard/type/dashboard-file.interface.ts
index 3dd5925e34..fa394904c2 100644
--- a/frontend/src/app/dashboard/type/dashboard-file.interface.ts
+++ b/frontend/src/app/dashboard/type/dashboard-file.interface.ts
@@ -50,6 +50,7 @@ export interface FileUploadItem {
description: string;
uploadProgress: number;
isUploadingFlag: boolean;
+ restart: boolean;
}
/**
diff --git a/sql/texera_ddl.sql b/sql/texera_ddl.sql
index f4728279e3..a2de095688 100644
--- a/sql/texera_ddl.sql
+++ b/sql/texera_ddl.sql
@@ -289,6 +289,7 @@ CREATE TABLE IF NOT EXISTS dataset_upload_session
num_parts_requested INT NOT NULL,
file_size_bytes BIGINT NOT NULL,
part_size_bytes BIGINT NOT NULL,
+ created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (uid, did, file_path),
diff --git a/sql/updates/20.sql b/sql/updates/20.sql
new file mode 100644
index 0000000000..058abb1d48
--- /dev/null
+++ b/sql/updates/20.sql
@@ -0,0 +1,37 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+-- ============================================
+-- 1. Connect to the texera_db database
+-- ============================================
+\c texera_db
+SET search_path TO texera_db, public;
+
+BEGIN;
+
+-- Step 1: Add the column (no default yet)
+ALTER TABLE dataset_upload_session
+ ADD COLUMN IF NOT EXISTS created_at TIMESTAMPTZ;
+
+-- Step 2: Add the default for future inserts
+ALTER TABLE dataset_upload_session
+ ALTER COLUMN created_at SET DEFAULT now();
+
+ALTER TABLE dataset_upload_session
+ ALTER COLUMN created_at SET NOT NULL;
+
+COMMIT;