This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5607-539a68551288a951d4c0a6d262105a8e6c211790 in repository https://gitbox.apache.org/repos/asf/texera.git
commit 597481595725bdfaa0e73bc18e140be465ab53e1 Author: Suyash Jain <[email protected]> AuthorDate: Wed Jun 10 16:36:07 2026 -0700 fix(file-service): apply LakeFS error handling to all call sites (#5607) ### What changes were proposed in this PR? #4177 introduced `LakeFSExceptionHandler.withLakeFSErrorHandling`, but only the multipart-upload and dataset-version paths used it. The remaining LakeFS call sites in `DatasetResource` either leaked raw `io.lakefs.clients.sdk.ApiException` to Dropwizard (an opaque 500 for the frontend) or caught `Exception` and rewrapped it as a generic 500, discarding the real LakeFS status code (401/403/404/409/...). ``` Before: LakeFS 404 -> raw ApiException / catch(Exception) -> 500 "Failed to ..." After: LakeFS 404 -> withLakeFSErrorHandling -> 404 "Error while deleting file 'a.csv' ...: LakeFS resource not found. ..." ``` Changes: | Change | Where | | --- | --- | | New overload `withLakeFSErrorHandling(operation: String)(call)` that prefixes the user-visible message with the failed operation | `LakeFSExceptionHandler.scala` | | 8 bare LakeFS calls now wrapped (size lookup, version listing, zip download, presigned URLs, cover image) | `DatasetResource.scala` | | 5 `catch Exception -> generic 500` blocks now use the handler; compensation logic (DB rollback on failed repo init, multipart abort) is preserved, and the abort-on-failure cleanup no longer masks the original error | `DatasetResource.scala` | Intentionally unchanged: best-effort cleanup sites that deliberately swallow errors, the per-dataset skip in `listDatasets`, and the `FileService` startup health check (failing fast at boot is correct there). ### Any related issues, documentation, discussions? Closes #4176 ### How was this PR tested? New `LakeFSExceptionHandlerSpec` (7 unit cases): status-code mapping (400/401/403/404/409/4xx/5xx/unknown), operation context included in the frontend-visible message, success passthrough, and non-LakeFS exceptions propagating untouched. New integration case in `DatasetResourceSpec`: deleting a dataset whose LakeFS repository does not exist now yields `NotFoundException` (404) instead of a generic 500. ``` sbt "FileService/testOnly org.apache.texera.service.util.LakeFSExceptionHandlerSpec" # Tests: succeeded 7, failed 0 sbt "FileService/testOnly org.apache.texera.service.resource.DatasetResourceSpec" # Tests: succeeded 94, failed 0 (Testcontainers: LakeFS 1.51 + MinIO + Postgres) ``` `sbt FileService/scalafixAll` and `sbt FileService/scalafmtAll` produce no further diff. ### Was this PR authored or co-authored using generative AI tooling? Yes, partially. I (Suyash Jain) worked on this PR together with Claude Code as a pair-programming assistant. I reviewed the final diff and ran the unit and Testcontainers-based integration suites locally before opening the PR. Generated-by: Claude Code (Claude Opus 4.7) --- .../texera/service/resource/DatasetResource.scala | 150 ++++++++++++--------- .../service/util/LakeFSExceptionHandler.scala | 25 +++- .../service/resource/DatasetResourceSpec.scala | 17 +++ .../service/util/LakeFSExceptionHandlerSpec.scala | 103 ++++++++++++++ 4 files changed, 227 insertions(+), 68 deletions(-) diff --git a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala index 6da19a924f..1f8f28a85c 100644 --- a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala +++ b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala @@ -251,7 +251,9 @@ class DatasetResource extends LazyLogging { getOwner(ctx, did).getEmail, userAccessPrivilege, isOwner, - LakeFSStorageClient.retrieveRepositorySize(targetDataset.getRepositoryName) + withLakeFSErrorHandling(s"retrieving the size of dataset '${targetDataset.getName}'") { + LakeFSStorageClient.retrieveRepositorySize(targetDataset.getRepositoryName) + } ) } @@ -309,16 +311,23 @@ class DatasetResource extends LazyLogging { // Initialize the repository in LakeFS val repositoryName = s"dataset-${createdDataset.getDid}" try { - LakeFSStorageClient.initRepo(repositoryName) + withLakeFSErrorHandling(s"creating the repository of dataset '${dataset.getName}'") { + LakeFSStorageClient.initRepo(repositoryName) + } } catch { case e: Exception => + // roll back the dataset record so a failed LakeFS init leaves no orphan row ctx .deleteFrom(DATASET) .where(DATASET.DID.eq(createdDataset.getDid)) .execute() - throw new WebApplicationException( - s"Failed to create the dataset: ${e.getMessage}" - ) + e match { + case web: WebApplicationException => throw web + case other => + throw new WebApplicationException( + s"Failed to create the dataset: ${other.getMessage}" + ) + } } // update repository name of the created dataset @@ -444,14 +453,8 @@ class DatasetResource extends LazyLogging { // throw the exception that user has no access to certain dataset throw new ForbiddenException(ERR_USER_HAS_NO_ACCESS_TO_DATASET_MESSAGE) } - try { + withLakeFSErrorHandling(s"deleting the repository of dataset '${dataset.getName}'") { LakeFSStorageClient.deleteRepo(dataset.getRepositoryName) - } catch { - case e: Exception => - throw new WebApplicationException( - s"Failed to delete a repository in LakeFS: ${e.getMessage}", - e - ) } // delete the directory on S3 if ( @@ -600,30 +603,39 @@ class DatasetResource extends LazyLogging { flush() // ---------- complete upload ---------- - LakeFSStorageClient.completePresignedMultipartUploads( - repoName, - filePath, - uploadId, - completedParts.toList, - physicalAddress - ) + withLakeFSErrorHandling(s"completing the multipart upload of file '$filePath'") { + LakeFSStorageClient.completePresignedMultipartUploads( + repoName, + filePath, + uploadId, + completedParts.toList, + physicalAddress + ) + } Response.ok(Map("message" -> s"Uploaded $filePath in ${completedParts.size} parts")).build() } } catch { case e: Exception => if (repoName != null && filePath != null && uploadId != null && physicalAddress != null) { - LakeFSStorageClient.abortPresignedMultipartUploads( - repoName, - filePath, - uploadId, - physicalAddress - ) + // best-effort cleanup; never let an abort failure mask the original error + try { + LakeFSStorageClient.abortPresignedMultipartUploads( + repoName, + filePath, + uploadId, + physicalAddress + ) + } catch { case _: Throwable => () } + } + e match { + case web: WebApplicationException => throw web + case other => + throw new WebApplicationException( + s"Failed to upload file to dataset: ${other.getMessage}", + other + ) } - throw new WebApplicationException( - s"Failed to upload file to dataset: ${e.getMessage}", - e - ) } } @@ -693,14 +705,8 @@ class DatasetResource extends LazyLogging { // Decode the file path val filePath = URLDecoder.decode(encodedFilePath, StandardCharsets.UTF_8.name()) - // Try to initialize the repository in LakeFS - try { + withLakeFSErrorHandling(s"deleting file '$filePath' from the dataset repository") { LakeFSStorageClient.deleteObject(repositoryName, filePath) - } catch { - case e: Exception => - throw new WebApplicationException( - s"Failed to delete the file from repo in LakeFS: ${e.getMessage}" - ) } Response.ok().build() @@ -1042,14 +1048,8 @@ class DatasetResource extends LazyLogging { // Decode the file path val filePath = URLDecoder.decode(encodedFilePath, StandardCharsets.UTF_8.name()) - // Try to reset the file change in LakeFS - try { + withLakeFSErrorHandling(s"resetting uncommitted changes of file '$filePath'") { LakeFSStorageClient.resetObjectUploadOrDeletion(repositoryName, filePath) - } catch { - case e: Exception => - throw new WebApplicationException( - s"Failed to reset the changes from repo in LakeFS: ${e.getMessage}" - ) } Response.ok().build() } @@ -1255,7 +1255,11 @@ class DatasetResource extends LazyLogging { val datasetName = dataset.getName val repositoryName = dataset.getRepositoryName val versionHash = datasetVersion.getVersionHash - val objects = LakeFSStorageClient.retrieveObjectsOfVersion(repositoryName, versionHash) + val objects = withLakeFSErrorHandling( + s"listing files of version '$versionHash' of dataset '$datasetName'" + ) { + LakeFSStorageClient.retrieveObjectsOfVersion(repositoryName, versionHash) + } if (objects.isEmpty) { return Response @@ -1271,7 +1275,9 @@ class DatasetResource extends LazyLogging { try { objects.foreach { obj => val filePath = obj.getPath - val file = LakeFSStorageClient.getFileFromRepo(repositoryName, versionHash, filePath) + val file = withLakeFSErrorHandling(s"downloading file '$filePath' for the zip") { + LakeFSStorageClient.getFileFromRepo(repositoryName, versionHash, filePath) + } zipOut.putNextEntry(new ZipEntry(filePath)) Files.copy(Paths.get(file.toURI), zipOut) @@ -1432,11 +1438,15 @@ class DatasetResource extends LazyLogging { errorResponse case Right((resolvedRepositoryName, resolvedCommitHash, resolvedFilePath)) => - val url = LakeFSStorageClient.getFilePresignedUrl( - resolvedRepositoryName, - resolvedCommitHash, - resolvedFilePath - ) + val url = withLakeFSErrorHandling( + s"generating a presigned URL for file '$resolvedFilePath'" + ) { + LakeFSStorageClient.getFilePresignedUrl( + resolvedRepositoryName, + resolvedCommitHash, + resolvedFilePath + ) + } Response.ok(Map("presignedUrl" -> url)).build() } @@ -2125,11 +2135,13 @@ class DatasetResource extends LazyLogging { ) .asInstanceOf[OnDataset] - val fileSize = LakeFSStorageClient.getFileSize( - document.getRepositoryName(), - document.getVersionHash(), - document.getFileRelativePath() - ) + val fileSize = withLakeFSErrorHandling(s"reading the size of cover image '$normalized'") { + LakeFSStorageClient.getFileSize( + document.getRepositoryName(), + document.getVersionHash(), + document.getFileRelativePath() + ) + } if (fileSize > COVER_IMAGE_SIZE_LIMIT_BYTES) { throw new BadRequestException( @@ -2179,11 +2191,15 @@ class DatasetResource extends LazyLogging { .openReadonlyDocument(FileResolver.resolve(fullPath)) .asInstanceOf[OnDataset] - val presignedUrl = LakeFSStorageClient.getFilePresignedUrl( - document.getRepositoryName(), - document.getVersionHash(), - document.getFileRelativePath() - ) + val presignedUrl = withLakeFSErrorHandling( + s"generating a presigned URL for cover image '$coverImage'" + ) { + LakeFSStorageClient.getFilePresignedUrl( + document.getRepositoryName(), + document.getVersionHash(), + document.getFileRelativePath() + ) + } Response.temporaryRedirect(new URI(presignedUrl)).build() } @@ -2224,11 +2240,15 @@ class DatasetResource extends LazyLogging { .openReadonlyDocument(FileResolver.resolve(fullPath)) .asInstanceOf[OnDataset] - val presignedUrl = LakeFSStorageClient.getFilePresignedUrl( - document.getRepositoryName(), - document.getVersionHash(), - document.getFileRelativePath() - ) + val presignedUrl = withLakeFSErrorHandling( + s"generating a presigned URL for cover image '$coverImage'" + ) { + LakeFSStorageClient.getFilePresignedUrl( + document.getRepositoryName(), + document.getVersionHash(), + document.getFileRelativePath() + ) + } Response.ok(Map("url" -> presignedUrl)).build() } diff --git a/file-service/src/main/scala/org/apache/texera/service/util/LakeFSExceptionHandler.scala b/file-service/src/main/scala/org/apache/texera/service/util/LakeFSExceptionHandler.scala index c1997fb647..1894817c43 100644 --- a/file-service/src/main/scala/org/apache/texera/service/util/LakeFSExceptionHandler.scala +++ b/file-service/src/main/scala/org/apache/texera/service/util/LakeFSExceptionHandler.scala @@ -46,17 +46,36 @@ object LakeFSExceptionHandler { try { call } catch { - case e: io.lakefs.clients.sdk.ApiException => handleException(e) + case e: io.lakefs.clients.sdk.ApiException => handleException(e, None) + } + } + + /** + * Wraps a LakeFS call with centralized error handling, prefixing the + * user-visible message with the operation that failed + * (e.g. "deleting file 'a.csv' from dataset 'd1'"). + */ + def withLakeFSErrorHandling[T](operation: String)(call: => T): T = { + try { + call + } catch { + case e: io.lakefs.clients.sdk.ApiException => handleException(e, Some(operation)) } } /** * Converts LakeFS ApiException to appropriate HTTP exception */ - private def handleException(e: io.lakefs.clients.sdk.ApiException): Nothing = { + private def handleException( + e: io.lakefs.clients.sdk.ApiException, + operation: Option[String] + ): Nothing = { val code = e.getCode val rawBody = Option(e.getResponseBody).filter(_.nonEmpty) - val message = s"${fallbackMessages(code)}" + val message = operation match { + case Some(op) => s"Error while $op: ${fallbackMessages(code)}" + case None => fallbackMessages(code) + } logger.warn(s"LakeFS error $code, ${e.getMessage}, body: ${rawBody.getOrElse("N/A")}") diff --git a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala index eec7628a0b..9a503d15fe 100644 --- a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala +++ b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala @@ -380,6 +380,23 @@ class DatasetResourceSpec datasetDao.fetchOneByDid(dataset.getDid) should not be null } + it should "surface a LakeFS 404 as NotFoundException when deleting a dataset whose repo is missing" in { + val dataset = new Dataset + dataset.setName("delete-ds-no-repo") + dataset.setRepositoryName("delete-ds-no-repo") + dataset.setDescription("for lakefs 404 mapping test") + dataset.setOwnerUid(ownerUser.getUid) + dataset.setIsPublic(true) + dataset.setIsDownloadable(true) + datasetDao.insert(dataset) + // intentionally no LakeFSStorageClient.initRepo: the repository does not exist in LakeFS + + val ex = intercept[NotFoundException] { + datasetResource.deleteDataset(dataset.getDid, sessionUser) + } + assertStatus(ex, 404) + } + "listDatasets" should "include a dataset whose LakeFS repo exists" in { val repoName = s"list-ok-${System.nanoTime()}" val dataset = new Dataset diff --git a/file-service/src/test/scala/org/apache/texera/service/util/LakeFSExceptionHandlerSpec.scala b/file-service/src/test/scala/org/apache/texera/service/util/LakeFSExceptionHandlerSpec.scala new file mode 100644 index 0000000000..3e9601d74b --- /dev/null +++ b/file-service/src/test/scala/org/apache/texera/service/util/LakeFSExceptionHandlerSpec.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.service.util + +import io.lakefs.clients.sdk.ApiException +import jakarta.ws.rs._ +import org.apache.texera.service.util.LakeFSExceptionHandler.withLakeFSErrorHandling +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class LakeFSExceptionHandlerSpec extends AnyFlatSpec with Matchers { + + private def lakeFSError(code: Int): ApiException = + new ApiException(code, s"lakefs returned $code") + + // typed as Unit (not Nothing) so the no-context overload resolves unambiguously + private def failingCall(code: Int): Unit = throw lakeFSError(code) + + private def entityMessage(e: WebApplicationException): String = + e.getResponse.getEntity + .asInstanceOf[java.util.Map[String, String]] + .get("message") + + "withLakeFSErrorHandling" should "return the call's result when no exception is thrown" in { + withLakeFSErrorHandling("reading a file")(42) shouldEqual 42 + } + + it should "map each LakeFS client error code to the matching JAX-RS exception" in { + val expected = Map( + 400 -> classOf[BadRequestException], + 401 -> classOf[NotAuthorizedException], + 403 -> classOf[ForbiddenException], + 404 -> classOf[NotFoundException] + ) + expected.foreach { + case (code, exceptionClass) => + val thrown = intercept[WebApplicationException] { + withLakeFSErrorHandling(failingCall(code)) + } + thrown.getClass shouldEqual exceptionClass + thrown.getResponse.getStatus shouldEqual code + } + } + + it should "keep the original status for other 4xx codes (e.g. 409 conflict)" in { + val thrown = intercept[WebApplicationException] { + withLakeFSErrorHandling(failingCall(409)) + } + thrown.getResponse.getStatus shouldEqual 409 + } + + it should "map server-side and unknown codes to a 500 response" in { + Seq(500, 502, 503, 0).foreach { code => + val thrown = intercept[InternalServerErrorException] { + withLakeFSErrorHandling(failingCall(code)) + } + thrown.getResponse.getStatus shouldEqual 500 + } + } + + it should "include the operation context in the message visible to the frontend" in { + val thrown = intercept[NotFoundException] { + withLakeFSErrorHandling("deleting file 'a.csv' from dataset 'd1'") { + throw lakeFSError(404) + } + } + val message = entityMessage(thrown) + message should include("deleting file 'a.csv' from dataset 'd1'") + message should include("not found") + } + + it should "produce a frontend-readable message without operation context too" in { + val thrown = intercept[ForbiddenException] { + withLakeFSErrorHandling(failingCall(403)) + } + entityMessage(thrown) should include("Permission denied") + } + + it should "let non-LakeFS exceptions propagate unchanged" in { + val original = new IllegalStateException("not a lakefs error") + val thrown = intercept[IllegalStateException] { + withLakeFSErrorHandling("any operation")(throw original) + } + thrown should be theSameInstanceAs original + } +}
