pitrou commented on a change in pull request #12030:
URL: https://github.com/apache/arrow/pull/12030#discussion_r777481461
##########
File path: r/src/io.cpp
##########
@@ -178,4 +180,134 @@ void io___BufferOutputStream__Write(
StopIfNotOk(stream->Write(RAW(bytes), bytes.size()));
}
+// TransformInputStream::TransformFunc wrapper
+
+class RIconvWrapper {
+ public:
+ RIconvWrapper(std::string to, std::string from)
+ : handle_(Riconv_open(to.c_str(), from.c_str())) {
+ if (handle_ == ((void*)-1)) {
+ cpp11::stop("Can't convert encoding from '%s' to '%s'", from.c_str(),
to.c_str());
+ }
+ }
+
+ size_t iconv(const char** inbuf, size_t* inbytesleft, char** outbuf,
+ size_t* outbytesleft) {
+ return Riconv(handle_, inbuf, inbytesleft, outbuf, outbytesleft);
+ }
+
+ ~RIconvWrapper() {
+ if (handle_ != ((void*)-1)) {
+ Riconv_close(handle_);
+ }
+ }
+
+ protected:
+ void* handle_;
+};
+
+struct ReencodeUTF8TransformFunctionWrapper {
+ explicit ReencodeUTF8TransformFunctionWrapper(std::string from)
+ : from_(from), iconv_("UTF-8", from), n_pending_(0) {}
+
+ // This may get copied and we need a fresh RIconvWrapper for each copy.
Review comment:
Or perhaps you can wrap the `RIconvWrapper` in a `shared_ptr`...
##########
File path: r/src/io.cpp
##########
@@ -178,4 +180,134 @@ void io___BufferOutputStream__Write(
StopIfNotOk(stream->Write(RAW(bytes), bytes.size()));
}
+// TransformInputStream::TransformFunc wrapper
+
+class RIconvWrapper {
+ public:
+ RIconvWrapper(std::string to, std::string from)
+ : handle_(Riconv_open(to.c_str(), from.c_str())) {
+ if (handle_ == ((void*)-1)) {
+ cpp11::stop("Can't convert encoding from '%s' to '%s'", from.c_str(),
to.c_str());
+ }
+ }
+
+ size_t iconv(const char** inbuf, size_t* inbytesleft, char** outbuf,
+ size_t* outbytesleft) {
+ return Riconv(handle_, inbuf, inbytesleft, outbuf, outbytesleft);
+ }
+
+ ~RIconvWrapper() {
+ if (handle_ != ((void*)-1)) {
+ Riconv_close(handle_);
+ }
+ }
+
+ protected:
+ void* handle_;
+};
+
+struct ReencodeUTF8TransformFunctionWrapper {
+ explicit ReencodeUTF8TransformFunctionWrapper(std::string from)
+ : from_(from), iconv_("UTF-8", from), n_pending_(0) {}
+
+ // This may get copied and we need a fresh RIconvWrapper for each copy.
+ ReencodeUTF8TransformFunctionWrapper(const
ReencodeUTF8TransformFunctionWrapper& ref)
+ : ReencodeUTF8TransformFunctionWrapper(ref.from_) {}
+
+ arrow::Result<std::shared_ptr<arrow::Buffer>> operator()(
+ const std::shared_ptr<arrow::Buffer>& src) {
+ ARROW_ASSIGN_OR_RAISE(auto dest, arrow::AllocateResizableBuffer(32));
+
+ size_t out_bytes_left = dest->size();
+ char* out_buf = (char*)dest->data();
+ size_t out_bytes_used = 0;
Review comment:
Please use `int64_t` everywhere for sizes, because it conforms to the
style guide and also to avoid conversions between signed and unsigned.
##########
File path: r/src/io.cpp
##########
@@ -178,4 +180,134 @@ void io___BufferOutputStream__Write(
StopIfNotOk(stream->Write(RAW(bytes), bytes.size()));
}
+// TransformInputStream::TransformFunc wrapper
+
+class RIconvWrapper {
+ public:
+ RIconvWrapper(std::string to, std::string from)
+ : handle_(Riconv_open(to.c_str(), from.c_str())) {
+ if (handle_ == ((void*)-1)) {
+ cpp11::stop("Can't convert encoding from '%s' to '%s'", from.c_str(),
to.c_str());
+ }
+ }
+
+ size_t iconv(const char** inbuf, size_t* inbytesleft, char** outbuf,
+ size_t* outbytesleft) {
+ return Riconv(handle_, inbuf, inbytesleft, outbuf, outbytesleft);
+ }
+
+ ~RIconvWrapper() {
+ if (handle_ != ((void*)-1)) {
+ Riconv_close(handle_);
+ }
+ }
+
+ protected:
+ void* handle_;
+};
+
+struct ReencodeUTF8TransformFunctionWrapper {
+ explicit ReencodeUTF8TransformFunctionWrapper(std::string from)
+ : from_(from), iconv_("UTF-8", from), n_pending_(0) {}
+
+ // This may get copied and we need a fresh RIconvWrapper for each copy.
+ ReencodeUTF8TransformFunctionWrapper(const
ReencodeUTF8TransformFunctionWrapper& ref)
+ : ReencodeUTF8TransformFunctionWrapper(ref.from_) {}
+
+ arrow::Result<std::shared_ptr<arrow::Buffer>> operator()(
+ const std::shared_ptr<arrow::Buffer>& src) {
+ ARROW_ASSIGN_OR_RAISE(auto dest, arrow::AllocateResizableBuffer(32));
+
+ size_t out_bytes_left = dest->size();
+ char* out_buf = (char*)dest->data();
Review comment:
Please use C++-style casts, e.g.
`reinterpret_cast<char*>(dest->mutable_data())`
##########
File path: r/src/io.cpp
##########
@@ -178,4 +180,134 @@ void io___BufferOutputStream__Write(
StopIfNotOk(stream->Write(RAW(bytes), bytes.size()));
}
+// TransformInputStream::TransformFunc wrapper
+
+class RIconvWrapper {
+ public:
+ RIconvWrapper(std::string to, std::string from)
+ : handle_(Riconv_open(to.c_str(), from.c_str())) {
+ if (handle_ == ((void*)-1)) {
+ cpp11::stop("Can't convert encoding from '%s' to '%s'", from.c_str(),
to.c_str());
+ }
+ }
+
+ size_t iconv(const char** inbuf, size_t* inbytesleft, char** outbuf,
+ size_t* outbytesleft) {
+ return Riconv(handle_, inbuf, inbytesleft, outbuf, outbytesleft);
+ }
+
+ ~RIconvWrapper() {
+ if (handle_ != ((void*)-1)) {
+ Riconv_close(handle_);
+ }
+ }
+
+ protected:
+ void* handle_;
+};
+
+struct ReencodeUTF8TransformFunctionWrapper {
+ explicit ReencodeUTF8TransformFunctionWrapper(std::string from)
+ : from_(from), iconv_("UTF-8", from), n_pending_(0) {}
+
+ // This may get copied and we need a fresh RIconvWrapper for each copy.
+ ReencodeUTF8TransformFunctionWrapper(const
ReencodeUTF8TransformFunctionWrapper& ref)
+ : ReencodeUTF8TransformFunctionWrapper(ref.from_) {}
+
+ arrow::Result<std::shared_ptr<arrow::Buffer>> operator()(
+ const std::shared_ptr<arrow::Buffer>& src) {
+ ARROW_ASSIGN_OR_RAISE(auto dest, arrow::AllocateResizableBuffer(32));
+
+ size_t out_bytes_left = dest->size();
+ char* out_buf = (char*)dest->data();
+ size_t out_bytes_used = 0;
+
+ size_t in_bytes_left;
+ const char* in_buf;
+ int64_t n_src_bytes_in_pending = 0;
+
+ // There may be a few leftover bytes from the last call to iconv. Process
these first
+ // using the internal buffer as the source. This may also result in a
partial
+ // character left over but will always get us into the src buffer.
+ if (n_pending_ > 0) {
+ // fill the pending_ buffer with characters and call iconv() once
+ n_src_bytes_in_pending =
+ std::min<int64_t>(sizeof(pending_) - n_pending_, src->size());
+ memcpy(pending_ + n_pending_, src->data(), n_src_bytes_in_pending);
+ in_buf = pending_;
+ in_bytes_left = n_pending_ + n_src_bytes_in_pending;
+
+ iconv_.iconv(&in_buf, &in_bytes_left, &out_buf, &out_bytes_left);
+
+ int64_t chars_read_out = out_buf - ((char*)dest->data());
+ out_bytes_used += chars_read_out;
+
+ int64_t chars_read_in = n_pending_ + n_src_bytes_in_pending -
in_bytes_left;
+ in_buf = (const char*)src->data() + chars_read_in - n_pending_;
+ in_bytes_left = src->size() + n_pending_ - chars_read_in;
+ } else {
+ in_buf = (const char*)src->data();
+ in_bytes_left = src->size();
+ }
+
+ // UTF-8 has a maximum of 4 bytes per character, so it's OK if we have a
few bytes
+ // left after processing all of src. If we have more than this, it means
the
+ // output buffer wasn't big enough.
+ while (in_bytes_left >= 4) {
+ int64_t new_size = std::max<int64_t>(src->size(), dest->size() * 2);
+ auto reserve_result = dest->Resize(new_size);
+ if (!reserve_result.ok()) {
+ return reserve_result;
+ }
+
+ out_buf = (char*)dest->data() + out_bytes_used;
+ out_bytes_left = dest->size() - out_bytes_used;
+
+ const char* in_buf_before = in_buf;
+ char* out_buf_before = out_buf;
+ iconv_.iconv(&in_buf, &in_bytes_left, &out_buf, &out_bytes_left);
Review comment:
Do we never check the return value of calling `iconv`? It cannot fail?
##########
File path: r/R/csv.R
##########
@@ -300,6 +300,11 @@ CsvTableReader$create <- function(file,
convert_options = CsvConvertOptions$create(),
...) {
assert_is(file, "InputStream")
+
+ if (!identical(read_options$encoding, "UTF-8")) {
Review comment:
Do we want to handle the various spellings of "utf-8" here? (lowercase,
uppercase, with or without a hyphen)
##########
File path: r/tests/testthat/test-csv.R
##########
@@ -290,6 +292,50 @@ test_that("more informative error when reading a CSV with
headers and schema", {
)
})
+test_that("CSV reader works on files with non-UTF-8 encoding", {
+ strings <- c("a", "\u00e9", "\U0001f4a9")
+ file_string <- paste0(
+ "col1,col2\n",
+ paste(strings, 1:30, sep = ",", collapse = "\n")
+ )
+ file_bytes_utf16 <- iconv(
+ file_string,
+ from = Encoding(file_string),
+ to = "UTF-16LE",
+ toRaw = TRUE
+ )[[1]]
+
+ tf <- tempfile()
+ on.exit(unlink(tf))
+ con <- file(tf, open = "wb")
+ writeBin(file_bytes_utf16, con)
+ close(con)
+
+ fs <- LocalFileSystem$create()
+ reader <- CsvTableReader$create(
+ fs$OpenInputStream(tf),
+ read_options = CsvReadOptions$create(encoding = "UTF-16LE")
+ )
+
+ table <- reader$Read()
+
+ # check that the CSV reader didn't create a binary column because of
+ # invalid bytes
+ expect_true(table$col1$type == string())
+
+ # check that the bytes are correct
+ expect_identical(
+ lapply(as.vector(table$col1$cast(binary())), as.raw),
+ rep(
+ list(as.raw(0x61), as.raw(c(0xc3, 0xa9)), as.raw(c(0xf0, 0x9f, 0x92,
0xa9))),
+ 10
+ )
+ )
+
+ # check that the strings are correct
+ expect_identical(as.vector(table$col1), rep(strings, 10))
+})
+
Review comment:
Can you add a check with a file that has invalid encoded contents?
##########
File path: r/src/io.cpp
##########
@@ -178,4 +180,134 @@ void io___BufferOutputStream__Write(
StopIfNotOk(stream->Write(RAW(bytes), bytes.size()));
}
+// TransformInputStream::TransformFunc wrapper
+
+class RIconvWrapper {
+ public:
+ RIconvWrapper(std::string to, std::string from)
+ : handle_(Riconv_open(to.c_str(), from.c_str())) {
+ if (handle_ == ((void*)-1)) {
+ cpp11::stop("Can't convert encoding from '%s' to '%s'", from.c_str(),
to.c_str());
+ }
+ }
+
+ size_t iconv(const char** inbuf, size_t* inbytesleft, char** outbuf,
+ size_t* outbytesleft) {
+ return Riconv(handle_, inbuf, inbytesleft, outbuf, outbytesleft);
+ }
+
+ ~RIconvWrapper() {
+ if (handle_ != ((void*)-1)) {
+ Riconv_close(handle_);
+ }
+ }
+
+ protected:
+ void* handle_;
+};
+
+struct ReencodeUTF8TransformFunctionWrapper {
+ explicit ReencodeUTF8TransformFunctionWrapper(std::string from)
+ : from_(from), iconv_("UTF-8", from), n_pending_(0) {}
+
+ // This may get copied and we need a fresh RIconvWrapper for each copy.
+ ReencodeUTF8TransformFunctionWrapper(const
ReencodeUTF8TransformFunctionWrapper& ref)
+ : ReencodeUTF8TransformFunctionWrapper(ref.from_) {}
+
+ arrow::Result<std::shared_ptr<arrow::Buffer>> operator()(
+ const std::shared_ptr<arrow::Buffer>& src) {
+ ARROW_ASSIGN_OR_RAISE(auto dest, arrow::AllocateResizableBuffer(32));
+
+ size_t out_bytes_left = dest->size();
+ char* out_buf = (char*)dest->data();
+ size_t out_bytes_used = 0;
+
+ size_t in_bytes_left;
+ const char* in_buf;
+ int64_t n_src_bytes_in_pending = 0;
+
+ // There may be a few leftover bytes from the last call to iconv. Process
these first
+ // using the internal buffer as the source. This may also result in a
partial
+ // character left over but will always get us into the src buffer.
+ if (n_pending_ > 0) {
+ // fill the pending_ buffer with characters and call iconv() once
+ n_src_bytes_in_pending =
+ std::min<int64_t>(sizeof(pending_) - n_pending_, src->size());
+ memcpy(pending_ + n_pending_, src->data(), n_src_bytes_in_pending);
+ in_buf = pending_;
+ in_bytes_left = n_pending_ + n_src_bytes_in_pending;
+
+ iconv_.iconv(&in_buf, &in_bytes_left, &out_buf, &out_bytes_left);
+
+ int64_t chars_read_out = out_buf - ((char*)dest->data());
+ out_bytes_used += chars_read_out;
+
+ int64_t chars_read_in = n_pending_ + n_src_bytes_in_pending -
in_bytes_left;
+ in_buf = (const char*)src->data() + chars_read_in - n_pending_;
+ in_bytes_left = src->size() + n_pending_ - chars_read_in;
+ } else {
+ in_buf = (const char*)src->data();
+ in_bytes_left = src->size();
+ }
+
+ // UTF-8 has a maximum of 4 bytes per character, so it's OK if we have a
few bytes
+ // left after processing all of src. If we have more than this, it means
the
+ // output buffer wasn't big enough.
+ while (in_bytes_left >= 4) {
+ int64_t new_size = std::max<int64_t>(src->size(), dest->size() * 2);
+ auto reserve_result = dest->Resize(new_size);
+ if (!reserve_result.ok()) {
+ return reserve_result;
+ }
+
+ out_buf = (char*)dest->data() + out_bytes_used;
+ out_bytes_left = dest->size() - out_bytes_used;
+
+ const char* in_buf_before = in_buf;
+ char* out_buf_before = out_buf;
+ iconv_.iconv(&in_buf, &in_bytes_left, &out_buf, &out_bytes_left);
+ int64_t bytes_read_out = out_buf - out_buf_before;
+ int64_t bytes_read_in = in_buf - in_buf_before;
+
+ if (bytes_read_out <= 0 || bytes_read_in <= 0) {
Review comment:
Is it possible for these quantities to be negative?
##########
File path: r/src/io.cpp
##########
@@ -178,4 +180,134 @@ void io___BufferOutputStream__Write(
StopIfNotOk(stream->Write(RAW(bytes), bytes.size()));
}
+// TransformInputStream::TransformFunc wrapper
+
+class RIconvWrapper {
+ public:
+ RIconvWrapper(std::string to, std::string from)
+ : handle_(Riconv_open(to.c_str(), from.c_str())) {
+ if (handle_ == ((void*)-1)) {
+ cpp11::stop("Can't convert encoding from '%s' to '%s'", from.c_str(),
to.c_str());
+ }
+ }
+
+ size_t iconv(const char** inbuf, size_t* inbytesleft, char** outbuf,
+ size_t* outbytesleft) {
+ return Riconv(handle_, inbuf, inbytesleft, outbuf, outbytesleft);
+ }
+
+ ~RIconvWrapper() {
+ if (handle_ != ((void*)-1)) {
+ Riconv_close(handle_);
+ }
+ }
+
+ protected:
+ void* handle_;
+};
+
+struct ReencodeUTF8TransformFunctionWrapper {
+ explicit ReencodeUTF8TransformFunctionWrapper(std::string from)
+ : from_(from), iconv_("UTF-8", from), n_pending_(0) {}
+
+ // This may get copied and we need a fresh RIconvWrapper for each copy.
+ ReencodeUTF8TransformFunctionWrapper(const
ReencodeUTF8TransformFunctionWrapper& ref)
+ : ReencodeUTF8TransformFunctionWrapper(ref.from_) {}
+
+ arrow::Result<std::shared_ptr<arrow::Buffer>> operator()(
+ const std::shared_ptr<arrow::Buffer>& src) {
+ ARROW_ASSIGN_OR_RAISE(auto dest, arrow::AllocateResizableBuffer(32));
+
+ size_t out_bytes_left = dest->size();
+ char* out_buf = (char*)dest->data();
+ size_t out_bytes_used = 0;
+
+ size_t in_bytes_left;
+ const char* in_buf;
+ int64_t n_src_bytes_in_pending = 0;
+
+ // There may be a few leftover bytes from the last call to iconv. Process
these first
+ // using the internal buffer as the source. This may also result in a
partial
+ // character left over but will always get us into the src buffer.
+ if (n_pending_ > 0) {
+ // fill the pending_ buffer with characters and call iconv() once
+ n_src_bytes_in_pending =
+ std::min<int64_t>(sizeof(pending_) - n_pending_, src->size());
+ memcpy(pending_ + n_pending_, src->data(), n_src_bytes_in_pending);
+ in_buf = pending_;
+ in_bytes_left = n_pending_ + n_src_bytes_in_pending;
+
+ iconv_.iconv(&in_buf, &in_bytes_left, &out_buf, &out_bytes_left);
+
+ int64_t chars_read_out = out_buf - ((char*)dest->data());
+ out_bytes_used += chars_read_out;
+
+ int64_t chars_read_in = n_pending_ + n_src_bytes_in_pending -
in_bytes_left;
+ in_buf = (const char*)src->data() + chars_read_in - n_pending_;
+ in_bytes_left = src->size() + n_pending_ - chars_read_in;
+ } else {
+ in_buf = (const char*)src->data();
+ in_bytes_left = src->size();
+ }
+
+ // UTF-8 has a maximum of 4 bytes per character, so it's OK if we have a
few bytes
+ // left after processing all of src. If we have more than this, it means
the
Review comment:
There could also be a few bytes left if `new_size` below was too small.
Don't we want to consume as much as possible?
##########
File path: r/src/io.cpp
##########
@@ -178,4 +180,134 @@ void io___BufferOutputStream__Write(
StopIfNotOk(stream->Write(RAW(bytes), bytes.size()));
}
+// TransformInputStream::TransformFunc wrapper
+
+class RIconvWrapper {
+ public:
+ RIconvWrapper(std::string to, std::string from)
+ : handle_(Riconv_open(to.c_str(), from.c_str())) {
+ if (handle_ == ((void*)-1)) {
+ cpp11::stop("Can't convert encoding from '%s' to '%s'", from.c_str(),
to.c_str());
+ }
+ }
+
+ size_t iconv(const char** inbuf, size_t* inbytesleft, char** outbuf,
+ size_t* outbytesleft) {
+ return Riconv(handle_, inbuf, inbytesleft, outbuf, outbytesleft);
+ }
+
+ ~RIconvWrapper() {
+ if (handle_ != ((void*)-1)) {
+ Riconv_close(handle_);
+ }
+ }
+
+ protected:
+ void* handle_;
+};
+
+struct ReencodeUTF8TransformFunctionWrapper {
+ explicit ReencodeUTF8TransformFunctionWrapper(std::string from)
+ : from_(from), iconv_("UTF-8", from), n_pending_(0) {}
+
+ // This may get copied and we need a fresh RIconvWrapper for each copy.
+ ReencodeUTF8TransformFunctionWrapper(const
ReencodeUTF8TransformFunctionWrapper& ref)
+ : ReencodeUTF8TransformFunctionWrapper(ref.from_) {}
+
+ arrow::Result<std::shared_ptr<arrow::Buffer>> operator()(
+ const std::shared_ptr<arrow::Buffer>& src) {
+ ARROW_ASSIGN_OR_RAISE(auto dest, arrow::AllocateResizableBuffer(32));
Review comment:
Pre-allocating `src->size()` bytes sounds like a better heuristic (or
perhaps even a bit more).
##########
File path: r/src/io.cpp
##########
@@ -178,4 +180,134 @@ void io___BufferOutputStream__Write(
StopIfNotOk(stream->Write(RAW(bytes), bytes.size()));
}
+// TransformInputStream::TransformFunc wrapper
+
+class RIconvWrapper {
+ public:
+ RIconvWrapper(std::string to, std::string from)
+ : handle_(Riconv_open(to.c_str(), from.c_str())) {
+ if (handle_ == ((void*)-1)) {
+ cpp11::stop("Can't convert encoding from '%s' to '%s'", from.c_str(),
to.c_str());
+ }
+ }
+
+ size_t iconv(const char** inbuf, size_t* inbytesleft, char** outbuf,
+ size_t* outbytesleft) {
+ return Riconv(handle_, inbuf, inbytesleft, outbuf, outbytesleft);
+ }
+
+ ~RIconvWrapper() {
+ if (handle_ != ((void*)-1)) {
+ Riconv_close(handle_);
+ }
+ }
+
+ protected:
+ void* handle_;
+};
+
+struct ReencodeUTF8TransformFunctionWrapper {
+ explicit ReencodeUTF8TransformFunctionWrapper(std::string from)
+ : from_(from), iconv_("UTF-8", from), n_pending_(0) {}
+
+ // This may get copied and we need a fresh RIconvWrapper for each copy.
+ ReencodeUTF8TransformFunctionWrapper(const
ReencodeUTF8TransformFunctionWrapper& ref)
+ : ReencodeUTF8TransformFunctionWrapper(ref.from_) {}
+
+ arrow::Result<std::shared_ptr<arrow::Buffer>> operator()(
+ const std::shared_ptr<arrow::Buffer>& src) {
+ ARROW_ASSIGN_OR_RAISE(auto dest, arrow::AllocateResizableBuffer(32));
Review comment:
Actually, it would perhaps be even better to use a `BufferBuilder` and
call `Reserve` accordingly. It will handle overallocation for you.
##########
File path: r/src/io.cpp
##########
@@ -178,4 +180,134 @@ void io___BufferOutputStream__Write(
StopIfNotOk(stream->Write(RAW(bytes), bytes.size()));
}
+// TransformInputStream::TransformFunc wrapper
+
+class RIconvWrapper {
+ public:
+ RIconvWrapper(std::string to, std::string from)
+ : handle_(Riconv_open(to.c_str(), from.c_str())) {
+ if (handle_ == ((void*)-1)) {
+ cpp11::stop("Can't convert encoding from '%s' to '%s'", from.c_str(),
to.c_str());
+ }
+ }
+
+ size_t iconv(const char** inbuf, size_t* inbytesleft, char** outbuf,
+ size_t* outbytesleft) {
+ return Riconv(handle_, inbuf, inbytesleft, outbuf, outbytesleft);
+ }
+
+ ~RIconvWrapper() {
+ if (handle_ != ((void*)-1)) {
+ Riconv_close(handle_);
+ }
+ }
+
+ protected:
+ void* handle_;
+};
+
+struct ReencodeUTF8TransformFunctionWrapper {
+ explicit ReencodeUTF8TransformFunctionWrapper(std::string from)
+ : from_(from), iconv_("UTF-8", from), n_pending_(0) {}
+
+ // This may get copied and we need a fresh RIconvWrapper for each copy.
+ ReencodeUTF8TransformFunctionWrapper(const
ReencodeUTF8TransformFunctionWrapper& ref)
+ : ReencodeUTF8TransformFunctionWrapper(ref.from_) {}
+
+ arrow::Result<std::shared_ptr<arrow::Buffer>> operator()(
+ const std::shared_ptr<arrow::Buffer>& src) {
+ ARROW_ASSIGN_OR_RAISE(auto dest, arrow::AllocateResizableBuffer(32));
+
+ size_t out_bytes_left = dest->size();
+ char* out_buf = (char*)dest->data();
+ size_t out_bytes_used = 0;
+
+ size_t in_bytes_left;
+ const char* in_buf;
+ int64_t n_src_bytes_in_pending = 0;
+
+ // There may be a few leftover bytes from the last call to iconv. Process
these first
+ // using the internal buffer as the source. This may also result in a
partial
+ // character left over but will always get us into the src buffer.
+ if (n_pending_ > 0) {
+ // fill the pending_ buffer with characters and call iconv() once
+ n_src_bytes_in_pending =
+ std::min<int64_t>(sizeof(pending_) - n_pending_, src->size());
+ memcpy(pending_ + n_pending_, src->data(), n_src_bytes_in_pending);
+ in_buf = pending_;
+ in_bytes_left = n_pending_ + n_src_bytes_in_pending;
+
+ iconv_.iconv(&in_buf, &in_bytes_left, &out_buf, &out_bytes_left);
+
+ int64_t chars_read_out = out_buf - ((char*)dest->data());
+ out_bytes_used += chars_read_out;
+
+ int64_t chars_read_in = n_pending_ + n_src_bytes_in_pending -
in_bytes_left;
+ in_buf = (const char*)src->data() + chars_read_in - n_pending_;
+ in_bytes_left = src->size() + n_pending_ - chars_read_in;
+ } else {
+ in_buf = (const char*)src->data();
+ in_bytes_left = src->size();
+ }
+
+ // UTF-8 has a maximum of 4 bytes per character, so it's OK if we have a
few bytes
+ // left after processing all of src. If we have more than this, it means
the
+ // output buffer wasn't big enough.
+ while (in_bytes_left >= 4) {
+ int64_t new_size = std::max<int64_t>(src->size(), dest->size() * 2);
+ auto reserve_result = dest->Resize(new_size);
+ if (!reserve_result.ok()) {
+ return reserve_result;
+ }
+
+ out_buf = (char*)dest->data() + out_bytes_used;
+ out_bytes_left = dest->size() - out_bytes_used;
+
+ const char* in_buf_before = in_buf;
+ char* out_buf_before = out_buf;
+ iconv_.iconv(&in_buf, &in_bytes_left, &out_buf, &out_bytes_left);
+ int64_t bytes_read_out = out_buf - out_buf_before;
+ int64_t bytes_read_in = in_buf - in_buf_before;
+
+ if (bytes_read_out <= 0 || bytes_read_in <= 0) {
+ // This should not happen, but if it does, we want to abort to make
sure
+ // the loop doesn't continue forever.
+ return arrow::Status::IOError(
+ "Call to iconv() read or appended zero output bytes");
+ }
+
+ out_bytes_used += bytes_read_out;
+ }
+
+ // Keep the leftover characters until the next call to the function
+ n_pending_ = in_bytes_left;
+ if (in_bytes_left > 0) {
+ memcpy(pending_, in_buf, in_bytes_left);
+ }
+
+ // Shrink the output buffer to only the size used
+ auto resize_result = dest->Resize(out_bytes_used);
Review comment:
Can use the `RETURN_NOT_OK` macro:
```c++
RETURN_NOT_OK(dest->Resize(out_bytes_used));
```
##########
File path: r/src/io.cpp
##########
@@ -178,4 +180,134 @@ void io___BufferOutputStream__Write(
StopIfNotOk(stream->Write(RAW(bytes), bytes.size()));
}
+// TransformInputStream::TransformFunc wrapper
+
+class RIconvWrapper {
+ public:
+ RIconvWrapper(std::string to, std::string from)
+ : handle_(Riconv_open(to.c_str(), from.c_str())) {
+ if (handle_ == ((void*)-1)) {
+ cpp11::stop("Can't convert encoding from '%s' to '%s'", from.c_str(),
to.c_str());
+ }
+ }
+
+ size_t iconv(const char** inbuf, size_t* inbytesleft, char** outbuf,
+ size_t* outbytesleft) {
+ return Riconv(handle_, inbuf, inbytesleft, outbuf, outbytesleft);
+ }
+
+ ~RIconvWrapper() {
+ if (handle_ != ((void*)-1)) {
+ Riconv_close(handle_);
+ }
+ }
+
+ protected:
+ void* handle_;
+};
+
+struct ReencodeUTF8TransformFunctionWrapper {
+ explicit ReencodeUTF8TransformFunctionWrapper(std::string from)
+ : from_(from), iconv_("UTF-8", from), n_pending_(0) {}
+
+ // This may get copied and we need a fresh RIconvWrapper for each copy.
+ ReencodeUTF8TransformFunctionWrapper(const
ReencodeUTF8TransformFunctionWrapper& ref)
+ : ReencodeUTF8TransformFunctionWrapper(ref.from_) {}
+
+ arrow::Result<std::shared_ptr<arrow::Buffer>> operator()(
+ const std::shared_ptr<arrow::Buffer>& src) {
+ ARROW_ASSIGN_OR_RAISE(auto dest, arrow::AllocateResizableBuffer(32));
+
+ size_t out_bytes_left = dest->size();
+ char* out_buf = (char*)dest->data();
+ size_t out_bytes_used = 0;
+
+ size_t in_bytes_left;
+ const char* in_buf;
+ int64_t n_src_bytes_in_pending = 0;
+
+ // There may be a few leftover bytes from the last call to iconv. Process
these first
+ // using the internal buffer as the source. This may also result in a
partial
+ // character left over but will always get us into the src buffer.
+ if (n_pending_ > 0) {
+ // fill the pending_ buffer with characters and call iconv() once
+ n_src_bytes_in_pending =
+ std::min<int64_t>(sizeof(pending_) - n_pending_, src->size());
+ memcpy(pending_ + n_pending_, src->data(), n_src_bytes_in_pending);
+ in_buf = pending_;
+ in_bytes_left = n_pending_ + n_src_bytes_in_pending;
+
+ iconv_.iconv(&in_buf, &in_bytes_left, &out_buf, &out_bytes_left);
+
+ int64_t chars_read_out = out_buf - ((char*)dest->data());
Review comment:
Please use C++-style casts, e.g.
`reinterpret_cast<char*>(dest->mutable_data())`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]