JasperSch commented on issue #11934:
URL: https://github.com/apache/arrow/issues/11934#issuecomment-1009778419


   @paleolimbot 
   
   Yes, that would be reasonable. Decided to open it here in the first place 
since I've got the feeling that the root cause of the issues lies in the way 
`arrow::write_dataset` writes the files to s3.
   
   See below an extended version of my example above.
   Please ignore the implementation of `put_object`. I had to fix it since the 
the version of `minio.s3` threw some errors.
   The example also still holds with aws backend and using `aws.s3::put_object`.
   
   Also (not shown here) creating the files locally with `arrow::write_dataset` 
and afterwards uploading to s3 using `aws.s3::put_object` allows you to 
afterwards download the files with `aws.s3::save_object` without errors.
   
   Thus conclusively,  my assumption is that `arrow::write_dataset` puts files 
on `s3` in another way than what `aws.s3::put_object` does. Hereby, something 
goes wrong with the files, which later on throws (unneeded) errors when 
downloading (indeed perfectly valid) files. Maybe it's something with the 
metadata about the files? Indexing? ...?
   
   So, to me it's still a question whether `arrow::write_dataset` or 
`aws.3::save_object` should be fixed.
   Maybe it's best to understand this first and get `arrow::write_dataset` of 
the table before opening an issue  
[here](https://github.com/cloudyr/aws.s3/issues)?
   
   ```
   # make sure we can connect
   s3_uri <- 
"s3://minioadmin:minioadmin@?scheme=http&endpoint_override=localhost%3A9000"
   bucket <- arrow::s3_bucket(s3_uri)
   bucket$ls("bucket")
   # > [1] "bucket/test"
   
   # write a dataset to minio
   data <- data.frame(x = letters[1:5])
   
   arrow::write_dataset(
       dataset = data,
       path = bucket$path("bucket/test")
   )
   
   
   Sys.setenv("AWS_ACCESS_KEY_ID" = "minioadmin", # enter your credentials
       "AWS_SECRET_ACCESS_KEY" = "minioadmin", # enter your credentials
       "AWS_DEFAULT_REGION" = "eu-west-1",
       "AWS_S3_ENDPOINT" = "localhost:9000")   
   
   setwd((tempdir()))
   minio.s3::save_object(
       object = "test/part-0.parquet",
       bucket = "bucket",
       file = "test",
       use_https = F
   )
   # Error: 'PAR124L
   # ' does not exist in current working directory
   
   system("ls")
   # test
   
   # FIX for minio.s3 put_object function.
   put_object <- function(file, 
       object, 
       bucket, 
       multipart = FALSE, 
       acl = c("private", "public-read", "public-read-write", 
           "aws-exec-read", "authenticated-read", 
           "bucket-owner-read", "bucket-owner-full-control"),
       headers = list(),
       base_url,
       region,
       key,
       secret,
       ...) {
     
     if (missing(base_url)) {
       base_url = Sys.getenv("AWS_S3_ENDPOINT")
     } 
     
     
     if (missing(region)) {
       region = Sys.getenv("AWS_DEFAULT_REGION")
     } 
     
     if (missing(key)) {
       key = Sys.getenv("AWS_ACCESS_KEY_ID")
     } 
     
     if (missing(secret)) {
       secret = Sys.getenv("AWS_SECRET_ACCESS_KEY")
     }      
     
     
     
     acl <- match.arg(acl)
     headers <- c(list(`x-amz-acl` = acl), headers)
     if (isTRUE(multipart)) {
       if (is.character(file) && file.exists(file)) {
         file <- readBin(file, what = "raw")
       }
       size <- length(file)
       partsize <- 1e8 # 100 MB
       nparts <- ceiling(size/partsize)
       
       # if file is small, there is no need for multipart upload
       if (size < partsize) {
         put_object(file = file, object = object, bucket = bucket, multipart = 
FALSE, headers = headers, ...)
         return(TRUE)
       }
       
       # function to call abort if any part fails
       abort <- function(id) delete_object(object = object, bucket = bucket, 
query = list(uploadId = id), ...)
       
       # split object into parts
       seqparts <- seq_len(partsize)
       parts <- list()
       for (i in seq_len(nparts)) {
         parts[[i]] <- head(file, partsize)
         if (i < nparts) {
           file <- file[-seqparts]
         }
       }
       
       # initialize the upload
       initialize <- post_object(file = NULL, object = object, bucket = bucket, 
query = list(uploads = ""), headers = headers, ...)
       id <- initialize[["UploadId"]]
       
       # loop over parts
       partlist <- list(Number = character(length(parts)),
           ETag = character(length(parts)))
       for (i in seq_along(parts)) {
         query <- list(partNumber = i, uploadId = id)
         r <- try(put_object(file = parts[[i]], object = object, bucket = 
bucket, 
                 multipart = FALSE, headers = headers, query = query), 
             silent = FALSE)
         if (inherits(r, "try-error")) {
           abort(id)
           stop("Multipart upload failed.")
         } else {
           partlist[["Number"]][i] <- i
           partlist[["ETag"]][i] <- attributes(r)[["ETag"]]
         }
       }
       
       # complete
       complete_parts(object = object, bucket = bucket, id = id, parts = 
partlist, ...)
       return(TRUE)
     } else {
       r <- minio.s3::s3HTTP(verb = "PUT", 
           bucket = bucket,
           path = paste0('/', object),
           headers = c(headers, list(
                   `Content-Length` = ifelse(is.character(file) && 
file.exists(file), 
                       file.size(file), length(file))
               )), 
           request_body = file,
           write_disk = NULL,
           accelerate = FALSE,
           dualstack = FALSE,
           parse_response = TRUE, 
           check_region = FALSE,
           url_style = c("path", "virtual"),
           base_url = base_url,
           verbose = getOption("verbose", FALSE),
           region = region, 
           key = key, 
           secret = secret, 
           session_token = NULL,
           use_https = FALSE)
       return(TRUE)
     }
   }
   
   put_object(
       object = "test/part-0.parquet",
       bucket = "bucket",
       file = "test",
       use_https = T
   )
   
   minio.s3::save_object(
       object = "test/part-0.parquet",
       bucket = "bucket",
       file = "test",
       use_https = F
   )
   # No error anymore!
   
   
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to