JasperSch commented on issue #11934: URL: https://github.com/apache/arrow/issues/11934#issuecomment-1009778419
@paleolimbot Yes, that would be reasonable. Decided to open it here in the first place since I've got the feeling that the root cause of the issues lies in the way `arrow::write_dataset` writes the files to s3. See below an extended version of my example above. Please ignore the implementation of `put_object`. I had to fix it since the the version of `minio.s3` threw some errors. The example also still holds with aws backend and using `aws.s3::put_object`. Also (not shown here) creating the files locally with `arrow::write_dataset` and afterwards uploading to s3 using `aws.s3::put_object` allows you to afterwards download the files with `aws.s3::save_object` without errors. Thus conclusively, my assumption is that `arrow::write_dataset` puts files on `s3` in another way than what `aws.s3::put_object` does. Hereby, something goes wrong with the files, which later on throws (unneeded) errors when downloading (indeed perfectly valid) files. Maybe it's something with the metadata about the files? Indexing? ...? So, to me it's still a question whether `arrow::write_dataset` or `aws.3::save_object` should be fixed. Maybe it's best to understand this first and get `arrow::write_dataset` of the table before opening an issue [here](https://github.com/cloudyr/aws.s3/issues)? ``` # make sure we can connect s3_uri <- "s3://minioadmin:minioadmin@?scheme=http&endpoint_override=localhost%3A9000" bucket <- arrow::s3_bucket(s3_uri) bucket$ls("bucket") # > [1] "bucket/test" # write a dataset to minio data <- data.frame(x = letters[1:5]) arrow::write_dataset( dataset = data, path = bucket$path("bucket/test") ) Sys.setenv("AWS_ACCESS_KEY_ID" = "minioadmin", # enter your credentials "AWS_SECRET_ACCESS_KEY" = "minioadmin", # enter your credentials "AWS_DEFAULT_REGION" = "eu-west-1", "AWS_S3_ENDPOINT" = "localhost:9000") setwd((tempdir())) minio.s3::save_object( object = "test/part-0.parquet", bucket = "bucket", file = "test", use_https = F ) # Error: 'PAR124L # ' does not exist in current working directory system("ls") # test # FIX for minio.s3 put_object function. put_object <- function(file, object, bucket, multipart = FALSE, acl = c("private", "public-read", "public-read-write", "aws-exec-read", "authenticated-read", "bucket-owner-read", "bucket-owner-full-control"), headers = list(), base_url, region, key, secret, ...) { if (missing(base_url)) { base_url = Sys.getenv("AWS_S3_ENDPOINT") } if (missing(region)) { region = Sys.getenv("AWS_DEFAULT_REGION") } if (missing(key)) { key = Sys.getenv("AWS_ACCESS_KEY_ID") } if (missing(secret)) { secret = Sys.getenv("AWS_SECRET_ACCESS_KEY") } acl <- match.arg(acl) headers <- c(list(`x-amz-acl` = acl), headers) if (isTRUE(multipart)) { if (is.character(file) && file.exists(file)) { file <- readBin(file, what = "raw") } size <- length(file) partsize <- 1e8 # 100 MB nparts <- ceiling(size/partsize) # if file is small, there is no need for multipart upload if (size < partsize) { put_object(file = file, object = object, bucket = bucket, multipart = FALSE, headers = headers, ...) return(TRUE) } # function to call abort if any part fails abort <- function(id) delete_object(object = object, bucket = bucket, query = list(uploadId = id), ...) # split object into parts seqparts <- seq_len(partsize) parts <- list() for (i in seq_len(nparts)) { parts[[i]] <- head(file, partsize) if (i < nparts) { file <- file[-seqparts] } } # initialize the upload initialize <- post_object(file = NULL, object = object, bucket = bucket, query = list(uploads = ""), headers = headers, ...) id <- initialize[["UploadId"]] # loop over parts partlist <- list(Number = character(length(parts)), ETag = character(length(parts))) for (i in seq_along(parts)) { query <- list(partNumber = i, uploadId = id) r <- try(put_object(file = parts[[i]], object = object, bucket = bucket, multipart = FALSE, headers = headers, query = query), silent = FALSE) if (inherits(r, "try-error")) { abort(id) stop("Multipart upload failed.") } else { partlist[["Number"]][i] <- i partlist[["ETag"]][i] <- attributes(r)[["ETag"]] } } # complete complete_parts(object = object, bucket = bucket, id = id, parts = partlist, ...) return(TRUE) } else { r <- minio.s3::s3HTTP(verb = "PUT", bucket = bucket, path = paste0('/', object), headers = c(headers, list( `Content-Length` = ifelse(is.character(file) && file.exists(file), file.size(file), length(file)) )), request_body = file, write_disk = NULL, accelerate = FALSE, dualstack = FALSE, parse_response = TRUE, check_region = FALSE, url_style = c("path", "virtual"), base_url = base_url, verbose = getOption("verbose", FALSE), region = region, key = key, secret = secret, session_token = NULL, use_https = FALSE) return(TRUE) } } put_object( object = "test/part-0.parquet", bucket = "bucket", file = "test", use_https = T ) minio.s3::save_object( object = "test/part-0.parquet", bucket = "bucket", file = "test", use_https = F ) # No error anymore! ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
