This is an automated email from the ASF dual-hosted git repository.
jiadongb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 989e4d0e8b feat(dataset): allow datasets to have the same name across
different users (#3715)
989e4d0e8b is described below
commit 989e4d0e8bc98060e0fb05cb0c42b90cc6619ac2
Author: Andy Zhang <[email protected]>
AuthorDate: Fri Oct 3 06:02:06 2025 +0100
feat(dataset): allow datasets to have the same name across different users
(#3715)
resolves #3683
The detailed design is described here
https://github.com/apache/texera/issues/3683#issuecomment-3231875282
Changes of this PR:
1. Added a column `repository_name` in the dataset table.
2. Use the generated repository name to store datasets in LakeFS and S3,
whose format is `dataset-{did}`
3. Apply relevant changes to the code to reflect the above changes.
a. Use the repository name in LakeFS interfaces.
b. Update web application service queries.
c. Update `FileResolver`.
4. Update the interface `/multipart-upload` of the dataset resource. Add
a new parameter `ownerEmail` since the dataset name is no longer
globally unique.
5. Add related test cases for the new feature.
---------
Co-authored-by: ali risheh <[email protected]>
---
.../dashboard/DatasetSearchQueryBuilder.scala | 8 +-
.../resource/dashboard/UnifiedResourceSchema.scala | 4 +-
.../web/resource/dashboard/hub/HubResource.scala | 2 +-
core/build.sbt | 2 +
.../edu/uci/ics/amber/config/StorageConfig.scala | 6 +-
core/file-service/build.sbt | 6 +-
.../texera/service/resource/DatasetResource.scala | 185 ++++++++++++++-------
.../edu/uci/ics/texera/service/MockLakeFS.scala | 124 ++++++++++++++
.../service/resource/DatasetResourceSpec.scala | 183 ++++++++++++++++++++
.../dataset-detail.component.ts | 4 +
.../service/user/dataset/dataset.service.ts | 19 ++-
core/scripts/sql/texera_ddl.sql | 1 +
core/scripts/sql/updates/15.sql | 38 +++++
.../uci/ics/amber/core/storage/FileResolver.scala | 8 +-
.../core/storage/model/DatasetFileDocument.scala | 18 +-
.../ics/amber/core/storage/model/OnDataset.scala | 2 +-
.../core/storage/util/LakeFSStorageClient.scala | 16 +-
.../uci/ics/amber/storage/FileResolverSpec.scala | 5 +-
.../k8s/texera-helmchart/files/texera_ddl.sql | 1 +
19 files changed, 536 insertions(+), 96 deletions(-)
diff --git
a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/DatasetSearchQueryBuilder.scala
b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/DatasetSearchQueryBuilder.scala
index 5587a46837..469cfbe5eb 100644
---
a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/DatasetSearchQueryBuilder.scala
+++
b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/DatasetSearchQueryBuilder.scala
@@ -44,6 +44,7 @@ object DatasetSearchQueryBuilder extends SearchQueryBuilder
with LazyLogging {
creationTime = DATASET.CREATION_TIME,
ownerId = DATASET.OWNER_UID,
did = DATASET.DID,
+ repositoryName = DATASET.REPOSITORY_NAME,
isDatasetPublic = DATASET.IS_PUBLIC,
isDatasetDownloadable = DATASET.IS_DOWNLOADABLE,
datasetUserAccess = DATASET_USER_ACCESS.PRIVILEGE
@@ -123,11 +124,14 @@ object DatasetSearchQueryBuilder extends
SearchQueryBuilder with LazyLogging {
var size = 0L
try {
- size = LakeFSStorageClient.retrieveRepositorySize(dataset.getName)
+ size =
LakeFSStorageClient.retrieveRepositorySize(dataset.getRepositoryName)
} catch {
case e: io.lakefs.clients.sdk.ApiException =>
// Treat all LakeFS ApiException as mismatch (repository not found,
being deleted, or any fatal error)
- logger.error(s"LakeFS ApiException for dataset '${dataset.getName}':
${e.getMessage}", e)
+ logger.error(
+ s"LakeFS ApiException for dataset repository
'${dataset.getRepositoryName}': ${e.getMessage}",
+ e
+ )
return null
}
diff --git
a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/UnifiedResourceSchema.scala
b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/UnifiedResourceSchema.scala
index a97fbe71ba..09fdacc55f 100644
---
a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/UnifiedResourceSchema.scala
+++
b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/UnifiedResourceSchema.scala
@@ -68,6 +68,7 @@ object UnifiedResourceSchema {
projectColor: Field[String] = DSL.inline(""),
did: Field[Integer] = DSL.cast(null, classOf[Integer]),
datasetStoragePath: Field[String] = DSL.cast(null, classOf[String]),
+ repositoryName: Field[String] = DSL.inline(""),
isDatasetPublic: Field[java.lang.Boolean] = DSL.cast(null,
classOf[java.lang.Boolean]),
isDatasetDownloadable: Field[java.lang.Boolean] = DSL.cast(null,
classOf[java.lang.Boolean]),
datasetUserAccess: Field[PrivilegeEnum] =
DSL.castNull(classOf[PrivilegeEnum])
@@ -91,6 +92,7 @@ object UnifiedResourceSchema {
projectColor -> projectColor.as("color"),
did -> did.as("did"),
datasetStoragePath -> datasetStoragePath.as("dataset_storage_path"),
+ repositoryName -> repositoryName.as("repository_name"),
isDatasetPublic -> isDatasetPublic.as("is_dataset_public"),
isDatasetDownloadable ->
isDatasetDownloadable.as("is_dataset_downloadable"),
datasetUserAccess -> datasetUserAccess.as("user_dataset_access")
@@ -133,6 +135,7 @@ object UnifiedResourceSchema {
* Attributes specific to datasets:
* - `did`: Dataset ID, as an `Integer`.
* - `datasetStoragePath`: The storage path of the dataset, as a `String`.
+ * - `repositoryName`: The name of the repository where the dataset is
stored, as a `String`.
* - `isDatasetPublic`: Indicates if the dataset is public, as a `Boolean`.
* - `isDatasetDownloadable`: Indicates if the dataset is downloadable, as a
`Boolean`.
* - `datasetUserAccess`: Access privileges for the dataset, as a
`PrivilegeEnum`
@@ -163,5 +166,4 @@ class UnifiedResourceSchema private (
}
ret
}
-
}
diff --git
a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/hub/HubResource.scala
b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/hub/HubResource.scala
index e62a95e343..50634b14a9 100644
---
a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/hub/HubResource.scala
+++
b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/hub/HubResource.scala
@@ -326,7 +326,7 @@ object HubResource {
dataset = dataset,
accessPrivilege = datasetAccess.getPrivilege,
ownerEmail = ownerEmail,
- size = LakeFSStorageClient.retrieveRepositorySize(dataset.getName)
+ size =
LakeFSStorageClient.retrieveRepositorySize(dataset.getRepositoryName)
)
}
.toList
diff --git a/core/build.sbt b/core/build.sbt
index dc4fa0e823..3e2e150ac9 100644
--- a/core/build.sbt
+++ b/core/build.sbt
@@ -51,6 +51,8 @@ lazy val ComputingUnitManagingService = (project in
file("computing-unit-managin
)
lazy val FileService = (project in file("file-service"))
.dependsOn(WorkflowCore, Auth, Config)
+ .configs(Test)
+ .dependsOn(DAO % "test->test") // test scope dependency
.settings(
dependencyOverrides ++= Seq(
// override it as io.dropwizard 4 require 2.16.1 or higher
diff --git
a/core/config/src/main/scala/edu/uci/ics/amber/config/StorageConfig.scala
b/core/config/src/main/scala/edu/uci/ics/amber/config/StorageConfig.scala
index 90d2c985e9..fd369fbb5b 100644
--- a/core/config/src/main/scala/edu/uci/ics/amber/config/StorageConfig.scala
+++ b/core/config/src/main/scala/edu/uci/ics/amber/config/StorageConfig.scala
@@ -65,7 +65,8 @@ object StorageConfig {
conf.getInt("storage.iceberg.table.commit.retry.max-wait-ms")
// LakeFS specifics
- val lakefsEndpoint: String = conf.getString("storage.lakefs.endpoint")
+ // lakefsEndpoint is a var because in test we need to override it to point
to the test container
+ var lakefsEndpoint: String = conf.getString("storage.lakefs.endpoint")
val lakefsApiSecret: String =
conf.getString("storage.lakefs.auth.api-secret")
val lakefsUsername: String = conf.getString("storage.lakefs.auth.username")
val lakefsPassword: String = conf.getString("storage.lakefs.auth.password")
@@ -73,7 +74,8 @@ object StorageConfig {
val lakefsBucketName: String =
conf.getString("storage.lakefs.block-storage.bucket-name")
// S3 specifics
- val s3Endpoint: String = conf.getString("storage.s3.endpoint")
+ // s3Endpoint is a var because in test we need to override it to point to
the test container
+ var s3Endpoint: String = conf.getString("storage.s3.endpoint")
val s3Region: String = conf.getString("storage.s3.region")
val s3Username: String = conf.getString("storage.s3.auth.username")
val s3Password: String = conf.getString("storage.s3.auth.password")
diff --git a/core/file-service/build.sbt b/core/file-service/build.sbt
index 27ca7a5fb1..236489e5c2 100644
--- a/core/file-service/build.sbt
+++ b/core/file-service/build.sbt
@@ -54,6 +54,7 @@ Compile / scalacOptions ++= Seq(
val dropwizardVersion = "4.0.7"
val mockitoVersion = "5.4.0"
val assertjVersion = "3.24.2"
+val testcontainersVersion = "0.43.0"
/////////////////////////////////////////////////////////////////////////////
// Test-related Dependencies
@@ -65,7 +66,10 @@ libraryDependencies ++= Seq(
"io.dropwizard" % "dropwizard-testing" % dropwizardVersion % Test, //
Dropwizard Testing
"org.mockito" % "mockito-core" % mockitoVersion % Test, //
Mockito for mocking
"org.assertj" % "assertj-core" % assertjVersion % Test, //
AssertJ for assertions
- "com.novocode" % "junit-interface" % "0.11" % Test // SBT
interface for JUnit
+ "com.novocode" % "junit-interface" % "0.11" % Test, // SBT
interface for JUnit
+ "com.dimafeng" %% "testcontainers-scala-scalatest" % testcontainersVersion %
Test, // Testcontainers ScalaTest integration
+ "com.dimafeng" %% "testcontainers-scala-postgresql" % testcontainersVersion
% Test, // PostgreSQL Testcontainer Scala integration
+ "com.dimafeng" %% "testcontainers-scala-minio" % testcontainersVersion %
Test, // MinIO Testcontainer Scala integration
)
/////////////////////////////////////////////////////////////////////////////
diff --git
a/core/file-service/src/main/scala/edu/uci/ics/texera/service/resource/DatasetResource.scala
b/core/file-service/src/main/scala/edu/uci/ics/texera/service/resource/DatasetResource.scala
index 5e7ee3964d..97f2209bec 100644
---
a/core/file-service/src/main/scala/edu/uci/ics/texera/service/resource/DatasetResource.scala
+++
b/core/file-service/src/main/scala/edu/uci/ics/texera/service/resource/DatasetResource.scala
@@ -224,7 +224,7 @@ class DatasetResource {
getOwner(ctx, did).getEmail,
userAccessPrivilege,
isOwner,
- LakeFSStorageClient.retrieveRepositorySize(targetDataset.getName)
+
LakeFSStorageClient.retrieveRepositorySize(targetDataset.getRepositoryName)
)
}
@@ -239,7 +239,6 @@ class DatasetResource {
withTransaction(context) { ctx =>
val uid = user.getUid
- val datasetDao: DatasetDao = new DatasetDao(ctx.configuration())
val datasetUserAccessDao: DatasetUserAccessDao = new
DatasetUserAccessDao(ctx.configuration())
val datasetName = request.datasetName
@@ -247,22 +246,25 @@ class DatasetResource {
val isDatasetPublic = request.isDatasetPublic
val isDatasetDownloadable = request.isDatasetDownloadable
- // Check if a dataset with the same name already exists
- if (!datasetDao.fetchByName(datasetName).isEmpty) {
- throw new BadRequestException("Dataset with the same name already
exists")
- }
-
- // Initialize the repository in LakeFS
+ // validate dataset name
try {
- LakeFSStorageClient.initRepo(datasetName)
+ validateDatasetName(datasetName)
} catch {
- case e: Exception =>
- throw new WebApplicationException(
- s"Failed to create the dataset: ${e.getMessage}"
- )
+ case e: IllegalArgumentException =>
+ throw new BadRequestException(e.getMessage)
}
- // Insert the dataset into the database
+ // Check if a dataset with the same name already exists
+ val existingDatasets = context
+ .selectFrom(DATASET)
+ .where(DATASET.OWNER_UID.eq(uid))
+ .and(DATASET.NAME.eq(datasetName))
+ .fetch()
+ if (!existingDatasets.isEmpty) {
+ throw new BadRequestException("Dataset with the same name already
exists")
+ }
+
+ // insert the dataset into the database
val dataset = new Dataset()
dataset.setName(datasetName)
dataset.setDescription(datasetDescription)
@@ -270,12 +272,32 @@ class DatasetResource {
dataset.setIsDownloadable(isDatasetDownloadable)
dataset.setOwnerUid(uid)
+ // insert record and get created dataset with did
val createdDataset = ctx
.insertInto(DATASET)
.set(ctx.newRecord(DATASET, dataset))
.returning()
.fetchOne()
+ // Initialize the repository in LakeFS
+ val repositoryName = s"dataset-${createdDataset.getDid}"
+ try {
+ LakeFSStorageClient.initRepo(repositoryName)
+ } catch {
+ case e: Exception =>
+ ctx
+ .deleteFrom(DATASET)
+ .where(DATASET.DID.eq(createdDataset.getDid))
+ .execute()
+ throw new WebApplicationException(
+ s"Failed to create the dataset: ${e.getMessage}"
+ )
+ }
+
+ // update repository name of the created dataset
+ createdDataset.setRepositoryName(repositoryName)
+ createdDataset.update()
+
// Insert the requester as the WRITE access user for this dataset
val datasetUserAccess = new DatasetUserAccess()
datasetUserAccess.setDid(createdDataset.getDid)
@@ -288,6 +310,7 @@ class DatasetResource {
createdDataset.getDid,
createdDataset.getOwnerUid,
createdDataset.getName,
+ createdDataset.getRepositoryName,
createdDataset.getIsPublic,
createdDataset.getIsDownloadable,
createdDataset.getDescription,
@@ -318,9 +341,10 @@ class DatasetResource {
val dataset = getDatasetByID(ctx, did)
val datasetName = dataset.getName
+ val repositoryName = dataset.getRepositoryName
// Check if there are any changes in LakeFS before creating a new version
- val diffs = LakeFSStorageClient.retrieveUncommittedObjects(repoName =
datasetName)
+ val diffs = LakeFSStorageClient.retrieveUncommittedObjects(repoName =
repositoryName)
if (diffs.isEmpty) {
throw new WebApplicationException(
@@ -345,7 +369,7 @@ class DatasetResource {
// Create a commit in LakeFS
val commit = LakeFSStorageClient.createCommit(
- repoName = datasetName,
+ repoName = repositoryName,
branch = "main",
commitMessage = s"Created dataset version: $newVersionName"
)
@@ -372,7 +396,7 @@ class DatasetResource {
.into(classOf[DatasetVersion])
// Retrieve committed file structure
- val fileNodes =
LakeFSStorageClient.retrieveObjectsOfVersion(datasetName, commit.getId)
+ val fileNodes =
LakeFSStorageClient.retrieveObjectsOfVersion(repositoryName, commit.getId)
DashboardDatasetVersion(
insertedVersion,
@@ -397,7 +421,7 @@ class DatasetResource {
throw new ForbiddenException(ERR_USER_HAS_NO_ACCESS_TO_DATASET_MESSAGE)
}
try {
- LakeFSStorageClient.deleteRepo(dataset.getName)
+ LakeFSStorageClient.deleteRepo(dataset.getRepositoryName)
} catch {
case e: Exception =>
throw new WebApplicationException(
@@ -407,8 +431,10 @@ class DatasetResource {
}
// delete the directory on S3
- if (S3StorageClient.directoryExists(StorageConfig.lakefsBucketName,
dataset.getName)) {
- S3StorageClient.deleteDirectory(StorageConfig.lakefsBucketName,
dataset.getName)
+ if (
+ S3StorageClient.directoryExists(StorageConfig.lakefsBucketName,
dataset.getRepositoryName)
+ ) {
+ S3StorageClient.deleteDirectory(StorageConfig.lakefsBucketName,
dataset.getRepositoryName)
}
// delete the dataset from the DB
@@ -466,7 +492,7 @@ class DatasetResource {
throw new
ForbiddenException(ERR_USER_HAS_NO_ACCESS_TO_DATASET_MESSAGE)
val dataset = getDatasetByID(ctx, did)
- repoName = dataset.getName
+ repoName = dataset.getRepositoryName
filePath = URLDecoder.decode(encodedFilePath,
StandardCharsets.UTF_8.name)
// ---------- decide part-size & number-of-parts ----------
@@ -560,12 +586,12 @@ class DatasetResource {
@Path("/presign-download")
def getPresignedUrl(
@QueryParam("filePath") encodedUrl: String,
- @QueryParam("datasetName") datasetName: String,
+ @QueryParam("repositoryName") repositoryName: String,
@QueryParam("commitHash") commitHash: String,
@Auth user: SessionUser
): Response = {
val uid = user.getUid
- generatePresignedResponse(encodedUrl, datasetName, commitHash, uid)
+ generatePresignedResponse(encodedUrl, repositoryName, commitHash, uid)
}
@GET
@@ -573,32 +599,32 @@ class DatasetResource {
@Path("/presign-download-s3")
def getPresignedUrlWithS3(
@QueryParam("filePath") encodedUrl: String,
- @QueryParam("datasetName") datasetName: String,
+ @QueryParam("repositoryName") repositoryName: String,
@QueryParam("commitHash") commitHash: String,
@Auth user: SessionUser
): Response = {
val uid = user.getUid
- generatePresignedResponse(encodedUrl, datasetName, commitHash, uid)
+ generatePresignedResponse(encodedUrl, repositoryName, commitHash, uid)
}
@GET
@Path("/public-presign-download")
def getPublicPresignedUrl(
@QueryParam("filePath") encodedUrl: String,
- @QueryParam("datasetName") datasetName: String,
+ @QueryParam("repositoryName") repositoryName: String,
@QueryParam("commitHash") commitHash: String
): Response = {
- generatePresignedResponse(encodedUrl, datasetName, commitHash, null)
+ generatePresignedResponse(encodedUrl, repositoryName, commitHash, null)
}
@GET
@Path("/public-presign-download-s3")
def getPublicPresignedUrlWithS3(
@QueryParam("filePath") encodedUrl: String,
- @QueryParam("datasetName") datasetName: String,
+ @QueryParam("repositoryName") repositoryName: String,
@QueryParam("commitHash") commitHash: String
): Response = {
- generatePresignedResponse(encodedUrl, datasetName, commitHash, null)
+ generatePresignedResponse(encodedUrl, repositoryName, commitHash, null)
}
@DELETE
@@ -615,13 +641,13 @@ class DatasetResource {
if (!userHasWriteAccess(ctx, did, uid)) {
throw new ForbiddenException(ERR_USER_HAS_NO_ACCESS_TO_DATASET_MESSAGE)
}
- val datasetName = getDatasetByID(ctx, did).getName
+ val repositoryName = getDatasetByID(ctx, did).getRepositoryName
// Decode the file path
val filePath = URLDecoder.decode(encodedFilePath,
StandardCharsets.UTF_8.name())
// Try to initialize the repository in LakeFS
try {
- LakeFSStorageClient.deleteObject(datasetName, filePath)
+ LakeFSStorageClient.deleteObject(repositoryName, filePath)
} catch {
case e: Exception =>
throw new WebApplicationException(
@@ -638,8 +664,9 @@ class DatasetResource {
@Path("/multipart-upload")
@Consumes(Array(MediaType.APPLICATION_JSON))
def multipartUpload(
- @QueryParam("datasetName") datasetName: String,
@QueryParam("type") operationType: String,
+ @QueryParam("ownerEmail") ownerEmail: String,
+ @QueryParam("datasetName") datasetName: String,
@QueryParam("filePath") encodedUrl: String,
@QueryParam("uploadId") uploadId: Optional[String],
@QueryParam("numParts") numParts: Optional[Integer],
@@ -652,13 +679,20 @@ class DatasetResource {
val uid = user.getUid
withTransaction(context) { ctx =>
- val datasetDao = new DatasetDao(ctx.configuration())
- val datasets = datasetDao.fetchByName(datasetName).asScala.toList
- if (datasets.isEmpty || !userHasWriteAccess(ctx, datasets.head.getDid,
uid)) {
+ val dataset = context
+ .select(DATASET.fields: _*)
+ .from(DATASET)
+ .leftJoin(USER)
+ .on(USER.UID.eq(DATASET.OWNER_UID))
+ .where(USER.EMAIL.eq(ownerEmail))
+ .and(DATASET.NAME.eq(datasetName))
+ .fetchOneInto(classOf[Dataset])
+ if (dataset == null || !userHasWriteAccess(ctx, dataset.getDid, uid)) {
throw new ForbiddenException(ERR_USER_HAS_NO_ACCESS_TO_DATASET_MESSAGE)
}
// Decode the file path
+ val repositoryName = dataset.getRepositoryName
val filePath = URLDecoder.decode(encodedUrl,
StandardCharsets.UTF_8.name())
operationType.toLowerCase match {
@@ -668,7 +702,7 @@ class DatasetResource {
)
val presignedResponse =
LakeFSStorageClient.initiatePresignedMultipartUploads(
- datasetName,
+ repositoryName,
filePath,
numPartsValue
)
@@ -725,7 +759,7 @@ class DatasetResource {
// Complete the multipart upload with parts and physical address
val objectStats =
LakeFSStorageClient.completePresignedMultipartUploads(
- datasetName,
+ repositoryName,
filePath,
uploadIdValue,
partsList,
@@ -754,7 +788,7 @@ class DatasetResource {
// Abort the multipart upload
LakeFSStorageClient.abortPresignedMultipartUploads(
- datasetName,
+ repositoryName,
filePath,
uploadIdValue,
physicalAddress
@@ -832,7 +866,7 @@ class DatasetResource {
// Retrieve staged (uncommitted) changes from LakeFS
val dataset = getDatasetByID(ctx, did)
- val lakefsDiffs =
LakeFSStorageClient.retrieveUncommittedObjects(dataset.getName)
+ val lakefsDiffs =
LakeFSStorageClient.retrieveUncommittedObjects(dataset.getRepositoryName)
// Convert LakeFS Diff objects to our custom Diff case class
lakefsDiffs.map(d =>
@@ -860,13 +894,13 @@ class DatasetResource {
if (!userHasWriteAccess(ctx, did, uid)) {
throw new ForbiddenException(ERR_USER_HAS_NO_ACCESS_TO_DATASET_MESSAGE)
}
- val datasetName = getDatasetByID(ctx, did).getName
+ val repositoryName = getDatasetByID(ctx, did).getRepositoryName
// Decode the file path
val filePath = URLDecoder.decode(encodedFilePath,
StandardCharsets.UTF_8.name())
// Try to reset the file change in LakeFS
try {
- LakeFSStorageClient.resetObjectUploadOrDeletion(datasetName, filePath)
+ LakeFSStorageClient.resetObjectUploadOrDeletion(repositoryName,
filePath)
} catch {
case e: Exception =>
throw new WebApplicationException(
@@ -938,7 +972,7 @@ class DatasetResource {
dataset = dataset,
accessPrivilege = PrivilegeEnum.READ,
ownerEmail = ownerEmail,
- size = LakeFSStorageClient.retrieveRepositorySize(dataset.getName)
+ size =
LakeFSStorageClient.retrieveRepositorySize(dataset.getRepositoryName)
)
})
publicDatasets.forEach { publicDataset =>
@@ -948,7 +982,8 @@ class DatasetResource {
dataset = publicDataset.dataset,
ownerEmail = publicDataset.ownerEmail,
accessPrivilege = PrivilegeEnum.READ,
- size =
LakeFSStorageClient.retrieveRepositorySize(publicDataset.dataset.getName)
+ size =
+
LakeFSStorageClient.retrieveRepositorySize(publicDataset.dataset.getRepositoryName)
)
accessibleDatasets = accessibleDatasets :+ dashboardDataset
}
@@ -1009,7 +1044,7 @@ class DatasetResource {
Map(
(user.getEmail, dataset.getName, latestVersion.getName) ->
LakeFSStorageClient
- .retrieveObjectsOfVersion(dataset.getName,
latestVersion.getVersionHash)
+ .retrieveObjectsOfVersion(dataset.getRepositoryName,
latestVersion.getVersionHash)
)
)
.head
@@ -1070,13 +1105,14 @@ class DatasetResource {
// Retrieve dataset and version details
val datasetName = dataset.getName
+ val repositoryName = dataset.getRepositoryName
val versionHash = datasetVersion.getVersionHash
- val objects = LakeFSStorageClient.retrieveObjectsOfVersion(datasetName,
versionHash)
+ val objects =
LakeFSStorageClient.retrieveObjectsOfVersion(repositoryName, versionHash)
if (objects.isEmpty) {
return Response
.status(Response.Status.NOT_FOUND)
- .entity(s"No objects found in version $versionHash of repository
$datasetName")
+ .entity(s"No objects found in version $versionHash of repository
$repositoryName")
.build()
}
@@ -1087,7 +1123,7 @@ class DatasetResource {
try {
objects.foreach { obj =>
val filePath = obj.getPath
- val file = LakeFSStorageClient.getFileFromRepo(datasetName,
versionHash, filePath)
+ val file = LakeFSStorageClient.getFileFromRepo(repositoryName,
versionHash, filePath)
zipOut.putNextEntry(new ZipEntry(filePath))
Files.copy(Paths.get(file.toURI), zipOut)
@@ -1214,6 +1250,29 @@ class DatasetResource {
.fetchInto(classOf[String])
}
+ /**
+ * Validates the dataset name.
+ *
+ * Rules:
+ * - Must be at least 1 character long.
+ * - Only lowercase letters, numbers, underscores, and hyphens are allowed.
+ * - Cannot start with a hyphen.
+ *
+ * @param name The dataset name to validate.
+ * @throws IllegalArgumentException if the name is invalid.
+ */
+ private def validateDatasetName(name: String): Unit = {
+ val datasetNamePattern = "^[A-Za-z0-9_-]+$".r
+ if (!datasetNamePattern.matches(name)) {
+ throw new IllegalArgumentException(
+ s"Invalid dataset name: '$name'. " +
+ "Dataset names must be at least 1 character long and " +
+ "contain only lowercase letters, numbers, underscores, and hyphens,
" +
+ "and cannot start with a hyphen."
+ )
+ }
+ }
+
private def fetchDatasetVersions(ctx: DSLContext, did: Integer):
List[DatasetVersion] = {
ctx
.selectFrom(DATASET_VERSION)
@@ -1233,12 +1292,13 @@ class DatasetResource {
val dataset = getDashboardDataset(ctx, did, uid)
val datasetVersion = getDatasetVersionByID(ctx, dvid)
val datasetName = dataset.dataset.getName
+ val repositoryName = dataset.dataset.getRepositoryName
val ownerFileNode = DatasetFileNode
.fromLakeFSRepositoryCommittedObjects(
Map(
(dataset.ownerEmail, datasetName, datasetVersion.getName) ->
LakeFSStorageClient
- .retrieveObjectsOfVersion(datasetName,
datasetVersion.getVersionHash)
+ .retrieveObjectsOfVersion(repositoryName,
datasetVersion.getVersionHash)
)
)
.head
@@ -1259,17 +1319,17 @@ class DatasetResource {
private def generatePresignedResponse(
encodedUrl: String,
- datasetName: String,
+ repositoryName: String,
commitHash: String,
uid: Integer
): Response = {
- resolveDatasetAndPath(encodedUrl, datasetName, commitHash, uid) match {
+ resolveDatasetAndPath(encodedUrl, repositoryName, commitHash, uid) match {
case Left(errorResponse) =>
errorResponse
- case Right((resolvedDatasetName, resolvedCommitHash, resolvedFilePath))
=>
+ case Right((resolvedRepositoryName, resolvedCommitHash,
resolvedFilePath)) =>
val url = LakeFSStorageClient.getFilePresignedUrl(
- resolvedDatasetName,
+ resolvedRepositoryName,
resolvedCommitHash,
resolvedFilePath
)
@@ -1280,29 +1340,29 @@ class DatasetResource {
private def resolveDatasetAndPath(
encodedUrl: String,
- datasetName: String,
+ repositoryName: String,
commitHash: String,
uid: Integer
): Either[Response, (String, String, String)] = {
val decodedPathStr = URLDecoder.decode(encodedUrl,
StandardCharsets.UTF_8.name())
- (Option(datasetName), Option(commitHash)) match {
+ (Option(repositoryName), Option(commitHash)) match {
case (Some(_), None) | (None, Some(_)) =>
// Case 1: Only one parameter is provided (error case)
Left(
Response
.status(Response.Status.BAD_REQUEST)
.entity(
- "Both datasetName and commitHash must be provided together, or
neither should be provided."
+ "Both repositoryName and commitHash must be provided together,
or neither should be provided."
)
.build()
)
- case (Some(dsName), Some(commit)) =>
- // Case 2: datasetName and commitHash are provided, validate access
+ case (Some(repositoryName), Some(commit)) =>
+ // Case 2: repositoryName and commitHash are provided, validate access
val response = withTransaction(context) { ctx =>
val datasetDao = new DatasetDao(ctx.configuration())
- val datasets = datasetDao.fetchByName(dsName).asScala.toList
+ val datasets =
datasetDao.fetchByRepositoryName(repositoryName).asScala.toList
if (datasets.isEmpty || !userHasReadAccess(ctx,
datasets.head.getDid, uid))
throw new
ForbiddenException(ERR_USER_HAS_NO_ACCESS_TO_DATASET_MESSAGE)
@@ -1311,17 +1371,18 @@ class DatasetResource {
// Standard read access check only - download restrictions handled
per endpoint
// Non-download operations (viewing) should work for all public
datasets
- (dsName, commit, decodedPathStr)
+ (repositoryName, commit, decodedPathStr)
}
Right(response)
case (None, None) =>
- // Case 3: Neither datasetName nor commitHash are provided, resolve
normally
+ // Case 3: Neither repositoryName nor commitHash are provided, resolve
normally
val response = withTransaction(context) { ctx =>
val fileUri = FileResolver.resolve(decodedPathStr)
val document =
DocumentFactory.openReadonlyDocument(fileUri).asInstanceOf[OnDataset]
val datasetDao = new DatasetDao(ctx.configuration())
- val datasets =
datasetDao.fetchByName(document.getDatasetName()).asScala.toList
+ val datasets =
+
datasetDao.fetchByRepositoryName(document.getRepositoryName()).asScala.toList
if (datasets.isEmpty || !userHasReadAccess(ctx,
datasets.head.getDid, uid))
throw new
ForbiddenException(ERR_USER_HAS_NO_ACCESS_TO_DATASET_MESSAGE)
@@ -1331,7 +1392,7 @@ class DatasetResource {
// Non-download operations (viewing) should work for all public
datasets
(
- document.getDatasetName(),
+ document.getRepositoryName(),
document.getVersionHash(),
document.getFileRelativePath()
)
diff --git
a/core/file-service/src/test/scala/edu/uci/ics/texera/service/MockLakeFS.scala
b/core/file-service/src/test/scala/edu/uci/ics/texera/service/MockLakeFS.scala
new file mode 100644
index 0000000000..77560bbfd4
--- /dev/null
+++
b/core/file-service/src/test/scala/edu/uci/ics/texera/service/MockLakeFS.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package edu.uci.ics.texera.service
+
+import com.dimafeng.testcontainers.{
+ ForAllTestContainer,
+ GenericContainer,
+ PostgreSQLContainer,
+ MinIOContainer,
+ MultipleContainers
+}
+import org.scalatest.{BeforeAndAfterAll, Suite}
+import org.testcontainers.containers.Network
+import org.testcontainers.utility.DockerImageName
+import edu.uci.ics.amber.config.StorageConfig
+import edu.uci.ics.texera.service.util.S3StorageClient
+
+/**
+ * Trait to spin up a LakeFS + MinIO + Postgres stack using Testcontainers,
+ * similar to how MockTexeraDB uses EmbeddedPostgres.
+ */
+trait MockLakeFS extends ForAllTestContainer with BeforeAndAfterAll { self:
Suite =>
+ // network for containers to communicate
+ val network: Network = Network.newNetwork()
+
+ // Postgres for LakeFS metadata
+ val postgres: PostgreSQLContainer = PostgreSQLContainer
+ .Def(
+ dockerImageName = DockerImageName.parse("postgres:15"),
+ databaseName = "texera_lakefs",
+ username = "texera_lakefs_admin",
+ password = "password"
+ )
+ .createContainer()
+ postgres.container.withNetwork(network)
+
+ // MinIO for object storage
+ val minio = MinIOContainer(
+ dockerImageName =
DockerImageName.parse("minio/minio:RELEASE.2025-02-28T09-55-16Z"),
+ userName = "texera_minio",
+ password = "password"
+ )
+ minio.container.withNetwork(network)
+
+ // LakeFS
+ val lakefsDatabaseURL: String =
+ s"postgresql://${postgres.username}:${postgres.password}" +
+
s"@${postgres.container.getNetworkAliases.get(0)}:5432/${postgres.databaseName}"
+
+ s"?sslmode=disable"
+ val lakefsUsername = "texera-admin"
+ val lakefsAccessKeyID = "AKIAIOSFOLKFSSAMPLES"
+ val lakefsSecretAccessKey = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
+ val lakefs = GenericContainer(
+ dockerImage = "treeverse/lakefs:1.51",
+ exposedPorts = Seq(8000),
+ env = Map(
+ "LAKEFS_BLOCKSTORE_TYPE" -> "s3",
+ "LAKEFS_BLOCKSTORE_S3_FORCE_PATH_STYLE" -> "true",
+ "LAKEFS_BLOCKSTORE_S3_ENDPOINT" ->
s"http://${minio.container.getNetworkAliases.get(0)}:9000",
+ "LAKEFS_BLOCKSTORE_S3_PRE_SIGNED_ENDPOINT" -> "http://localhost:9000",
+ "LAKEFS_BLOCKSTORE_S3_CREDENTIALS_ACCESS_KEY_ID" -> "texera_minio",
+ "LAKEFS_BLOCKSTORE_S3_CREDENTIALS_SECRET_ACCESS_KEY" -> "password",
+ "LAKEFS_AUTH_ENCRYPT_SECRET_KEY" -> "random_string_for_lakefs",
+ "LAKEFS_LOGGING_LEVEL" -> "INFO",
+ "LAKEFS_STATS_ENABLED" -> "1",
+ "LAKEFS_DATABASE_TYPE" -> "postgres",
+ "LAKEFS_DATABASE_POSTGRES_CONNECTION_STRING" -> lakefsDatabaseURL,
+ "LAKEFS_INSTALLATION_USER_NAME" -> lakefsUsername,
+ "LAKEFS_INSTALLATION_ACCESS_KEY_ID" -> lakefsAccessKeyID,
+ "LAKEFS_INSTALLATION_SECRET_ACCESS_KEY" -> lakefsSecretAccessKey
+ )
+ )
+ lakefs.container.withNetwork(network)
+
+ override val container = MultipleContainers(postgres, minio, lakefs)
+
+ def lakefsBaseUrl: String =
s"http://${lakefs.host}:${lakefs.mappedPort(8000)}"
+ def minioEndpoint: String = s"http://${minio.host}:${minio.mappedPort(9000)}"
+
+ override def afterStart(): Unit = {
+ super.afterStart()
+
+ // setup LakeFS
+ val lakefsSetupResult = lakefs.container.execInContainer(
+ "lakefs",
+ "setup",
+ "--user-name",
+ lakefsUsername,
+ "--access-key-id",
+ lakefsAccessKeyID,
+ "--secret-access-key",
+ lakefsSecretAccessKey
+ )
+ if (lakefsSetupResult.getExitCode != 0) {
+ throw new RuntimeException(
+ s"Failed to setup LakeFS: ${lakefsSetupResult.getStderr}"
+ )
+ }
+
+ // replace storage endpoints in StorageConfig
+ StorageConfig.s3Endpoint = minioEndpoint
+ StorageConfig.lakefsEndpoint = s"$lakefsBaseUrl/api/v1"
+
+ // create S3 bucket
+ S3StorageClient.createBucketIfNotExist(StorageConfig.lakefsBucketName)
+ }
+}
diff --git
a/core/file-service/src/test/scala/edu/uci/ics/texera/service/resource/DatasetResourceSpec.scala
b/core/file-service/src/test/scala/edu/uci/ics/texera/service/resource/DatasetResourceSpec.scala
new file mode 100644
index 0000000000..1a41488ebf
--- /dev/null
+++
b/core/file-service/src/test/scala/edu/uci/ics/texera/service/resource/DatasetResourceSpec.scala
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package edu.uci.ics.texera.service.resource
+
+import edu.uci.ics.amber.core.storage.util.LakeFSStorageClient
+import edu.uci.ics.texera.auth.SessionUser
+import edu.uci.ics.texera.dao.MockTexeraDB
+import edu.uci.ics.texera.dao.jooq.generated.enums.UserRoleEnum
+import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{DatasetDao, UserDao}
+import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{Dataset, User}
+import edu.uci.ics.texera.service.MockLakeFS
+import jakarta.ws.rs.{BadRequestException, ForbiddenException}
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class DatasetResourceSpec
+ extends AnyFlatSpec
+ with Matchers
+ with MockTexeraDB
+ with MockLakeFS
+ with BeforeAndAfterAll {
+
+ private val testUser: User = {
+ val user = new User
+ user.setName("test_user")
+ user.setPassword("123")
+ user.setEmail("[email protected]")
+ user.setRole(UserRoleEnum.ADMIN)
+ user
+ }
+
+ private val testUser2: User = {
+ val user = new User
+ user.setName("test_user2")
+ user.setPassword("123")
+ user.setEmail("[email protected]")
+ user.setRole(UserRoleEnum.ADMIN)
+ user
+ }
+
+ private val testDataset: Dataset = {
+ val dataset = new Dataset
+ dataset.setName("test-dataset")
+ dataset.setRepositoryName("test-dataset")
+ dataset.setIsPublic(true)
+ dataset.setIsDownloadable(true)
+ dataset.setDescription("dataset for test")
+ dataset
+ }
+
+ lazy val datasetDao = new DatasetDao(getDSLContext.configuration())
+
+ lazy val datasetResource = new DatasetResource()
+
+ lazy val sessionUser = new SessionUser(testUser)
+ lazy val sessionUser2 = new SessionUser(testUser2)
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+
+ // init db
+ initializeDBAndReplaceDSLContext()
+
+ // insert test user
+ val userDao = new UserDao(getDSLContext.configuration())
+ userDao.insert(testUser)
+ userDao.insert(testUser2)
+
+ // insert test dataset
+ testDataset.setOwnerUid(testUser.getUid)
+ datasetDao.insert(testDataset)
+ }
+
+ "createDataset" should "create dataset successfully if user does not have a
dataset with the same name" in {
+ val createDatasetRequest = DatasetResource.CreateDatasetRequest(
+ datasetName = "new-dataset",
+ datasetDescription = "description for new dataset",
+ isDatasetPublic = false,
+ isDatasetDownloadable = true
+ )
+
+ val createdDataset = datasetResource.createDataset(createDatasetRequest,
sessionUser)
+ createdDataset.dataset.getName shouldEqual "new-dataset"
+ createdDataset.dataset.getDescription shouldEqual "description for new
dataset"
+ createdDataset.dataset.getIsPublic shouldBe false
+ createdDataset.dataset.getIsDownloadable shouldBe true
+ }
+
+ it should "refuse to create dataset if user already has a dataset with the
same name" in {
+ val createDatasetRequest = DatasetResource.CreateDatasetRequest(
+ datasetName = "test-dataset",
+ datasetDescription = "description for new dataset",
+ isDatasetPublic = false,
+ isDatasetDownloadable = true
+ )
+
+ assertThrows[BadRequestException] {
+ datasetResource.createDataset(createDatasetRequest, sessionUser)
+ }
+ }
+
+ it should "create dataset successfully if another user has a dataset with
the same name" in {
+ val createDatasetRequest = DatasetResource.CreateDatasetRequest(
+ datasetName = "test-dataset",
+ datasetDescription = "description for new dataset",
+ isDatasetPublic = false,
+ isDatasetDownloadable = true
+ )
+
+ val createdDataset = datasetResource.createDataset(createDatasetRequest,
sessionUser2)
+ createdDataset.dataset.getName shouldEqual "test-dataset"
+ createdDataset.dataset.getDescription shouldEqual "description for new
dataset"
+ createdDataset.dataset.getIsPublic shouldBe false
+ createdDataset.dataset.getIsDownloadable shouldBe true
+ }
+
+ it should "delete dataset successfully if user owns it" in {
+ // insert a dataset directly into DB
+ val dataset = new Dataset
+ dataset.setName("delete-ds")
+ dataset.setRepositoryName("delete-ds")
+ dataset.setDescription("for delete test")
+ dataset.setOwnerUid(testUser.getUid)
+ dataset.setIsPublic(true)
+ dataset.setIsDownloadable(true)
+ datasetDao.insert(dataset)
+
+ // create repo in LakeFS to match dataset
+ LakeFSStorageClient.initRepo(dataset.getRepositoryName)
+
+ // delete via DatasetResource
+ val response = datasetResource.deleteDataset(dataset.getDid, sessionUser)
+
+ // assert: response OK and DB no longer contains dataset
+ response.getStatus shouldEqual 200
+ datasetDao.fetchOneByDid(dataset.getDid) shouldBe null
+ }
+
+ it should "refuse to delete dataset if not owned by user" in {
+ // insert a dataset directly into DB
+ val dataset = new Dataset
+ dataset.setName("user1-ds")
+ dataset.setRepositoryName("user1-ds")
+ dataset.setDescription("for forbidden test")
+ dataset.setOwnerUid(testUser.getUid)
+ dataset.setIsPublic(true)
+ dataset.setIsDownloadable(true)
+ datasetDao.insert(dataset)
+
+ // create repo in LakeFS to match dataset
+ LakeFSStorageClient.initRepo(dataset.getRepositoryName)
+
+ // user2 tries to delete, should throw ForbiddenException
+ assertThrows[ForbiddenException] {
+ datasetResource.deleteDataset(dataset.getDid, sessionUser2)
+ }
+
+ // dataset still exists in DB
+ datasetDao.fetchOneByDid(dataset.getDid) should not be null
+ }
+
+ override protected def afterAll(): Unit = {
+ shutdownDB()
+ }
+}
diff --git
a/core/gui/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.ts
b/core/gui/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.ts
index 274fda9651..f2559babf6 100644
---
a/core/gui/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.ts
+++
b/core/gui/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.ts
@@ -61,6 +61,7 @@ export class DatasetDetailComponent implements OnInit {
public datasetIsPublic: boolean = false;
public datasetIsDownloadable: boolean = true;
public userDatasetAccessLevel: "READ" | "WRITE" | "NONE" = "NONE";
+ public ownerEmail: string = "";
public isOwner: boolean = false;
public currentDisplayedFileName: string = "";
@@ -276,6 +277,7 @@ export class DatasetDetailComponent implements OnInit {
this.userDatasetAccessLevel = dashboardDataset.accessPrivilege;
this.datasetIsPublic = dataset.isPublic;
this.datasetIsDownloadable = dataset.isDownloadable;
+ this.ownerEmail = dashboardDataset.ownerEmail;
this.isOwner = dashboardDataset.isOwner;
if (typeof dataset.creationTime === "number") {
const date = new Date(dataset.creationTime);
@@ -421,6 +423,7 @@ export class DatasetDetailComponent implements OnInit {
// Start multipart upload
const subscription = this.datasetService
.multipartUpload(
+ this.ownerEmail,
this.datasetName,
file.name,
file.file,
@@ -558,6 +561,7 @@ export class DatasetDetailComponent implements OnInit {
this.datasetService
.finalizeMultipartUpload(
+ this.ownerEmail,
this.datasetName,
task.filePath,
task.uploadId,
diff --git a/core/gui/src/app/dashboard/service/user/dataset/dataset.service.ts
b/core/gui/src/app/dashboard/service/user/dataset/dataset.service.ts
index c50c08ea0d..40226d0fef 100644
--- a/core/gui/src/app/dashboard/service/user/dataset/dataset.service.ts
+++ b/core/gui/src/app/dashboard/service/user/dataset/dataset.service.ts
@@ -143,6 +143,7 @@ export class DatasetService {
* with a concurrency limit on how many parts we process in parallel.
*/
public multipartUpload(
+ ownerEmail: string,
datasetName: string,
filePath: string,
file: File,
@@ -207,7 +208,7 @@ export class DatasetService {
};
};
- const subscription = this.initiateMultipartUpload(datasetName, filePath,
partCount)
+ const subscription = this.initiateMultipartUpload(ownerEmail,
datasetName, filePath, partCount)
.pipe(
switchMap(initiateResponse => {
const { uploadId, presignedUrls, physicalAddress } =
initiateResponse;
@@ -312,7 +313,15 @@ export class DatasetService {
toArray(),
// 4) Finalize if all parts succeeded
switchMap(() =>
- this.finalizeMultipartUpload(datasetName, filePath, uploadId,
uploadedParts, physicalAddress, false)
+ this.finalizeMultipartUpload(
+ ownerEmail,
+ datasetName,
+ filePath,
+ uploadId,
+ uploadedParts,
+ physicalAddress,
+ false
+ )
),
tap(() => {
const finalTotalTime = (Date.now() - startTime) / 1000;
@@ -343,6 +352,7 @@ export class DatasetService {
});
return this.finalizeMultipartUpload(
+ ownerEmail,
datasetName,
filePath,
uploadId,
@@ -363,17 +373,20 @@ export class DatasetService {
/**
* Initiates a multipart upload and retrieves presigned URLs for each part.
+ * @param ownerEmail Owner's email
* @param datasetName Dataset Name
* @param filePath File path within the dataset
* @param numParts Number of parts for the multipart upload
*/
private initiateMultipartUpload(
+ ownerEmail: string,
datasetName: string,
filePath: string,
numParts: number
): Observable<{ uploadId: string; presignedUrls: string[]; physicalAddress:
string }> {
const params = new HttpParams()
.set("type", "init")
+ .set("ownerEmail", ownerEmail)
.set("datasetName", datasetName)
.set("filePath", encodeURIComponent(filePath))
.set("numParts", numParts.toString());
@@ -389,6 +402,7 @@ export class DatasetService {
* Completes or aborts a multipart upload, sending part numbers and ETags to
the backend.
*/
public finalizeMultipartUpload(
+ ownerEmail: string,
datasetName: string,
filePath: string,
uploadId: string,
@@ -398,6 +412,7 @@ export class DatasetService {
): Observable<Response> {
const params = new HttpParams()
.set("type", isAbort ? "abort" : "finish")
+ .set("ownerEmail", ownerEmail)
.set("datasetName", datasetName)
.set("filePath", encodeURIComponent(filePath))
.set("uploadId", uploadId);
diff --git a/core/scripts/sql/texera_ddl.sql b/core/scripts/sql/texera_ddl.sql
index e98f8e2172..b9cac6f545 100644
--- a/core/scripts/sql/texera_ddl.sql
+++ b/core/scripts/sql/texera_ddl.sql
@@ -232,6 +232,7 @@ CREATE TABLE IF NOT EXISTS dataset
did SERIAL PRIMARY KEY,
owner_uid INT NOT NULL,
name VARCHAR(128) NOT NULL,
+ repository_name VARCHAR(128),
is_public BOOLEAN NOT NULL DEFAULT TRUE,
is_downloadable BOOLEAN NOT NULL DEFAULT TRUE,
description VARCHAR(512) NOT NULL,
diff --git a/core/scripts/sql/updates/15.sql b/core/scripts/sql/updates/15.sql
new file mode 100644
index 0000000000..fa1959d984
--- /dev/null
+++ b/core/scripts/sql/updates/15.sql
@@ -0,0 +1,38 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+-- ============================================
+-- 1. Connect to the texera_db database
+-- ============================================
+\c texera_db
+
+SET search_path TO texera_db;
+
+-- ============================================
+-- 2. Update the table schema
+-- ============================================
+BEGIN;
+
+-- 1. Add new column repository_name to dataset table.
+ALTER TABLE dataset
+ ADD COLUMN repository_name varchar(128);
+
+-- 2. Copy the data from name column to repository_name column.
+UPDATE dataset
+SET repository_name = name;
+
+COMMIT;
diff --git
a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/FileResolver.scala
b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/FileResolver.scala
index 533b84f986..6bf138b4c0 100644
---
a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/FileResolver.scala
+++
b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/FileResolver.scala
@@ -80,8 +80,8 @@ object FileResolver {
*
* The fileName format should be:
/ownerEmail/datasetName/versionName/fileRelativePath
* e.g. /[email protected]/twitterDataset/v1/california/irvine/tw1.csv
- * The output dataset URI format is:
{DATASET_FILE_URI_SCHEME}:///{did}/{versionHash}/file-path
- * e.g. {DATASET_FILE_URI_SCHEME}:///15/adeq233td/some/dir/file.txt
+ * The output dataset URI format is:
{DATASET_FILE_URI_SCHEME}:///{repositoryName}/{versionHash}/fileRelativePath
+ * e.g.
{DATASET_FILE_URI_SCHEME}:///dataset-15/adeq233td/some/dir/file.txt
*
* @param fileName the name of the file to attempt resolving as a
DatasetFileDocument
* @return Either[String, DatasetFileDocument] - Right(document) if
creation succeeds
@@ -138,11 +138,11 @@ object FileResolver {
// Prepend dataset name and versionHash to the encoded path segments
val allPathSegments = Array(
- datasetName,
+ dataset.getRepositoryName,
datasetVersion.getVersionHash
) ++ encodedFileRelativePath
- // Build the format /{datasetName}/{versionHash}/{fileRelativePath}, both
Linux and Windows use forward slash as the splitter
+ // Build the format /{repositoryName}/{versionHash}/{fileRelativePath},
both Linux and Windows use forward slash as the splitter
val uriSplitter = "/"
val encodedPath = uriSplitter + allPathSegments.mkString(uriSplitter)
diff --git
a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/model/DatasetFileDocument.scala
b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/model/DatasetFileDocument.scala
index 37a44ee089..872a6f5922 100644
---
a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/model/DatasetFileDocument.scala
+++
b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/model/DatasetFileDocument.scala
@@ -62,18 +62,18 @@ private[storage] class DatasetFileDocument(uri: URI)
if (segments.length < 3)
throw new IllegalArgumentException("URI format is incorrect")
- // TODO: consider whether use dataset name or did
- val datasetName = segments(0)
+ // parse uri to dataset components
+ val repositoryName = segments(0)
val datasetVersionHash = URLDecoder.decode(segments(1),
StandardCharsets.UTF_8)
val decodedRelativeSegments =
segments.drop(2).map(part => URLDecoder.decode(part,
StandardCharsets.UTF_8))
val fileRelativePath = Paths.get(decodedRelativeSegments.head,
decodedRelativeSegments.tail: _*)
- (datasetName, datasetVersionHash, fileRelativePath)
+ (repositoryName, datasetVersionHash, fileRelativePath)
}
// Extract components from URI using the utility function
- private val (datasetName, datasetVersionHash, fileRelativePath) =
parseUri(uri)
+ private val (repositoryName, datasetVersionHash, fileRelativePath) =
parseUri(uri)
private var tempFile: Option[File] = None
@@ -84,7 +84,7 @@ private[storage] class DatasetFileDocument(uri: URI)
def fallbackToLakeFS(exception: Throwable): InputStream = {
logger.warn(s"${exception.getMessage}. Falling back to LakeFS direct
file fetch.", exception)
val file = LakeFSStorageClient.getFileFromRepo(
- getDatasetName(),
+ getRepositoryName(),
getVersionHash(),
getFileRelativePath()
)
@@ -94,7 +94,7 @@ private[storage] class DatasetFileDocument(uri: URI)
if (userJwtToken.isEmpty) {
try {
val presignUrl = LakeFSStorageClient.getFilePresignedUrl(
- getDatasetName(),
+ getRepositoryName(),
getVersionHash(),
getFileRelativePath()
)
@@ -105,7 +105,7 @@ private[storage] class DatasetFileDocument(uri: URI)
}
} else {
val presignRequestUrl =
-
s"$fileServiceGetPresignURLEndpoint?datasetName=${getDatasetName()}&commitHash=${getVersionHash()}&filePath=${URLEncoder
+
s"$fileServiceGetPresignURLEndpoint?repositoryName=${getRepositoryName()}&commitHash=${getVersionHash()}&filePath=${URLEncoder
.encode(getFileRelativePath(), StandardCharsets.UTF_8.name())}"
val connection = new
URL(presignRequestUrl).openConnection().asInstanceOf[HttpURLConnection]
@@ -176,9 +176,9 @@ private[storage] class DatasetFileDocument(uri: URI)
)
}
- override def getVersionHash(): String = datasetVersionHash
+ override def getRepositoryName(): String = repositoryName
- override def getDatasetName(): String = datasetName
+ override def getVersionHash(): String = datasetVersionHash
override def getFileRelativePath(): String = fileRelativePath.toString
}
diff --git
a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/model/OnDataset.scala
b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/model/OnDataset.scala
index 7c58f4651f..79d96e3936 100644
---
a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/model/OnDataset.scala
+++
b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/model/OnDataset.scala
@@ -20,7 +20,7 @@
package edu.uci.ics.amber.core.storage.model
trait OnDataset {
- def getDatasetName(): String
+ def getRepositoryName(): String
def getVersionHash(): String
diff --git
a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/util/LakeFSStorageClient.scala
b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/util/LakeFSStorageClient.scala
index 4cabed1e7b..65a300eb3c 100644
---
a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/util/LakeFSStorageClient.scala
+++
b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/util/LakeFSStorageClient.scala
@@ -77,22 +77,22 @@ object LakeFSStorageClient {
* Initializes a new repository in LakeFS.
*
* @param repoName Name of the repository.
- * @param defaultBranch Default branch name, usually "main".
*/
def initRepo(
repoName: String
): Repository = {
+ // validate repoName, see
https://docs.lakefs.io/latest/understand/model/#repository
val repoNamePattern = "^[a-z0-9][a-z0-9-]{2,62}$".r
-
- // Validate repoName
if (!repoNamePattern.matches(repoName)) {
throw new IllegalArgumentException(
- s"Invalid dataset name: '$repoName'. " +
- "Dataset names must be 3-63 characters long, " +
+ s"Invalid repository name: '$repoName'. " +
+ "Repository names must be 3-63 characters long, " +
"contain only lowercase letters, numbers, and hyphens, " +
- "and cannot start or end with a hyphen."
+ "and cannot start with a hyphen."
)
}
+
+ // create repository
val storageNamespace = s"$storageNamespaceURI/$repoName"
val repo = new RepositoryCreation()
.name(repoName)
@@ -108,7 +108,6 @@ object LakeFSStorageClient {
* Converts the InputStream to a temporary file for upload.
*
* @param repoName Repository name.
- * @param branch Branch name.
* @param filePath Path in the repository.
* @param inputStream File content stream.
*/
@@ -161,7 +160,6 @@ object LakeFSStorageClient {
* Executes operations and creates a commit (similar to a transactional
commit).
*
* @param repoName Repository name.
- * @param branch Branch name.
* @param commitMessage Commit message.
* @param operations File operations to perform before committing.
*/
@@ -291,7 +289,7 @@ object LakeFSStorageClient {
repoApi.deleteRepository(repoName).execute()
}
- def retrieveVersionsOfRepository(repoName: String): List[Commit] = {
+ private def retrieveVersionsOfRepository(repoName: String): List[Commit] = {
refsApi
.logCommits(repoName, branchName)
.execute()
diff --git
a/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/FileResolverSpec.scala
b/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/FileResolverSpec.scala
index d7263cd823..787510d8f1 100644
---
a/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/FileResolverSpec.scala
+++
b/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/FileResolverSpec.scala
@@ -50,6 +50,7 @@ class FileResolverSpec
val dataset = new Dataset
dataset.setDid(Integer.valueOf(1))
dataset.setName("test_dataset")
+ dataset.setRepositoryName("test_dataset")
dataset.setDescription("dataset for test")
dataset.setIsPublic(true)
dataset.setOwnerUid(Integer.valueOf(1))
@@ -110,10 +111,10 @@ class FileResolverSpec
val dataset1TxtUri = FileResolver.resolve(dataset1TxtFilePath)
assert(
- datasetACsvUri.toString ==
f"${FileResolver.DATASET_FILE_URI_SCHEME}:///${testDataset.getName}/${testDatasetVersion2.getVersionHash}/directory/a.csv"
+ datasetACsvUri.toString ==
f"${FileResolver.DATASET_FILE_URI_SCHEME}:///${testDataset.getRepositoryName}/${testDatasetVersion2.getVersionHash}/directory/a.csv"
)
assert(
- dataset1TxtUri.toString ==
f"${FileResolver.DATASET_FILE_URI_SCHEME}:///${testDataset.getName}/${testDatasetVersion1.getVersionHash}/1.txt"
+ dataset1TxtUri.toString ==
f"${FileResolver.DATASET_FILE_URI_SCHEME}:///${testDataset.getRepositoryName}/${testDatasetVersion1.getVersionHash}/1.txt"
)
}
diff --git a/deployment/k8s/texera-helmchart/files/texera_ddl.sql
b/deployment/k8s/texera-helmchart/files/texera_ddl.sql
index bece11a542..67c5f573ec 100644
--- a/deployment/k8s/texera-helmchart/files/texera_ddl.sql
+++ b/deployment/k8s/texera-helmchart/files/texera_ddl.sql
@@ -267,6 +267,7 @@ CREATE TABLE IF NOT EXISTS dataset
did SERIAL PRIMARY KEY,
owner_uid INT NOT NULL,
name VARCHAR(128) NOT NULL,
+ repository_name VARCHAR(128),
is_public BOOLEAN NOT NULL DEFAULT TRUE,
is_downloadable BOOLEAN NOT NULL DEFAULT TRUE,
description VARCHAR(512) NOT NULL,