[ 
https://issues.apache.org/jira/browse/ARROW-2319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16442282#comment-16442282
 ] 

ASF GitHub Bot commented on ARROW-2319:
---------------------------------------

xhochy closed pull request #1903: ARROW-2319: [C++] Add BufferedOutputStream 
class
URL: https://github.com/apache/arrow/pull/1903
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 5fd9256702..da7e24d2d3 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -30,6 +30,7 @@ set(ARROW_SRCS
   type.cc
   visitor.cc
 
+  io/buffered.cc
   io/file.cc
   io/interfaces.cc
   io/memory.cc
diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt
index a7e569870b..65221d097f 100644
--- a/cpp/src/arrow/io/CMakeLists.txt
+++ b/cpp/src/arrow/io/CMakeLists.txt
@@ -18,6 +18,7 @@
 # ----------------------------------------------------------------------
 # arrow_io : Arrow IO interfaces
 
+ADD_ARROW_TEST(io-buffered-test)
 ADD_ARROW_TEST(io-file-test)
 
 if (ARROW_HDFS AND NOT ARROW_BOOST_HEADER_ONLY)
@@ -26,11 +27,13 @@ endif()
 
 ADD_ARROW_TEST(io-memory-test)
 
+ADD_ARROW_BENCHMARK(io-file-benchmark)
 ADD_ARROW_BENCHMARK(io-memory-benchmark)
 
 # Headers: top level
 install(FILES
   api.h
+  buffered.h
   file.h
   hdfs.h
   interfaces.h
diff --git a/cpp/src/arrow/io/buffered.cc b/cpp/src/arrow/io/buffered.cc
new file mode 100644
index 0000000000..7ff8ad6804
--- /dev/null
+++ b/cpp/src/arrow/io/buffered.cc
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/io/buffered.h"
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+
+#include <cstring>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <utility>
+
+namespace arrow {
+namespace io {
+
+// ----------------------------------------------------------------------
+// BufferedOutputStream implementation
+
+class BufferedOutputStream::Impl {
+ public:
+  explicit Impl(std::shared_ptr<OutputStream> raw)
+      : raw_(raw),
+        is_open_(true),
+        buffer_(std::string(BUFFER_SIZE, '\0')),
+        buffer_data_(const_cast<char*>(buffer_.data())),
+        buffer_pos_(0),
+        raw_pos_(-1) {}
+
+  ~Impl() { DCHECK(Close().ok()); }
+
+  Status Close() {
+    std::lock_guard<std::mutex> guard(lock_);
+    if (is_open_) {
+      Status st = FlushUnlocked();
+      is_open_ = false;
+      RETURN_NOT_OK(raw_->Close());
+      return st;
+    }
+    return Status::OK();
+  }
+
+  Status Tell(int64_t* position) const {
+    std::lock_guard<std::mutex> guard(lock_);
+    if (raw_pos_ == -1) {
+      RETURN_NOT_OK(raw_->Tell(&raw_pos_));
+      DCHECK_GE(raw_pos_, 0);
+    }
+    *position = raw_pos_ + buffer_pos_;
+    return Status::OK();
+  }
+
+  Status Write(const void* data, int64_t nbytes) {
+    std::lock_guard<std::mutex> guard(lock_);
+    if (nbytes < 0) {
+      return Status::Invalid("write count should be >= 0");
+    }
+    if (nbytes == 0) {
+      return Status::OK();
+    }
+    if (nbytes + buffer_pos_ >= BUFFER_SIZE) {
+      RETURN_NOT_OK(FlushUnlocked());
+      DCHECK_EQ(buffer_pos_, 0);
+      if (nbytes >= BUFFER_SIZE) {
+        // Direct write
+        return raw_->Write(data, nbytes);
+      }
+    }
+    DCHECK_LE(buffer_pos_ + nbytes, BUFFER_SIZE);
+    std::memcpy(buffer_data_ + buffer_pos_, data, nbytes);
+    buffer_pos_ += nbytes;
+    return Status::OK();
+  }
+
+  Status FlushUnlocked() {
+    if (buffer_pos_ > 0) {
+      // Invalidate cached raw pos
+      raw_pos_ = -1;
+      RETURN_NOT_OK(raw_->Write(buffer_data_, buffer_pos_));
+      buffer_pos_ = 0;
+    }
+    return Status::OK();
+  }
+
+  Status Flush() {
+    std::lock_guard<std::mutex> guard(lock_);
+    return FlushUnlocked();
+  }
+
+  std::shared_ptr<OutputStream> raw() const { return raw_; }
+
+ private:
+  // This size chosen so that memcpy() remains cheap
+  static const int64_t BUFFER_SIZE = 4096;
+
+  std::shared_ptr<OutputStream> raw_;
+  bool is_open_;
+  std::string buffer_;
+  char* buffer_data_;
+  int64_t buffer_pos_;
+  mutable int64_t raw_pos_;
+  mutable std::mutex lock_;
+};
+
+BufferedOutputStream::BufferedOutputStream(std::shared_ptr<OutputStream> raw)
+    : impl_(new BufferedOutputStream::Impl(std::move(raw))) {}
+
+BufferedOutputStream::~BufferedOutputStream() {}
+
+Status BufferedOutputStream::Close() { return impl_->Close(); }
+
+Status BufferedOutputStream::Tell(int64_t* position) const {
+  return impl_->Tell(position);
+}
+
+Status BufferedOutputStream::Write(const void* data, int64_t nbytes) {
+  return impl_->Write(data, nbytes);
+}
+
+std::shared_ptr<OutputStream> BufferedOutputStream::raw() const { return 
impl_->raw(); }
+
+}  // namespace io
+}  // namespace arrow
diff --git a/cpp/src/arrow/io/buffered.h b/cpp/src/arrow/io/buffered.h
new file mode 100644
index 0000000000..deb344cb7c
--- /dev/null
+++ b/cpp/src/arrow/io/buffered.h
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Buffered stream implementations
+
+#ifndef ARROW_IO_BUFFERED_H
+#define ARROW_IO_BUFFERED_H
+
+#include <memory>
+#include <string>
+
+#include "arrow/io/interfaces.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class Status;
+
+namespace io {
+
+class ARROW_EXPORT BufferedOutputStream : public OutputStream {
+ public:
+  ~BufferedOutputStream() override;
+
+  /// \brief Create a buffered output stream wrapping the given output stream.
+  explicit BufferedOutputStream(std::shared_ptr<OutputStream> raw);
+
+  // OutputStream interface
+
+  /// \brief Close the buffered output stream.  This implicitly closes the
+  /// underlying raw output stream.
+  Status Close() override;
+
+  Status Tell(int64_t* position) const override;
+  // Write bytes to the stream. Thread-safe
+  Status Write(const void* data, int64_t nbytes) override;
+
+  /// \brief Return the underlying raw output stream.
+  std::shared_ptr<OutputStream> raw() const;
+
+ private:
+  class ARROW_NO_EXPORT Impl;
+  std::unique_ptr<Impl> impl_;
+};
+
+}  // namespace io
+}  // namespace arrow
+
+#endif  // ARROW_IO_BUFFERED_H
diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc
index 02cc4dbbd6..008f2b2e22 100644
--- a/cpp/src/arrow/io/file.cc
+++ b/cpp/src/arrow/io/file.cc
@@ -388,8 +388,10 @@ class OSFile {
 
   Status Close() {
     if (is_open_) {
-      RETURN_NOT_OK(FileClose(fd_));
+      // Even if closing fails, the fd will likely be closed (perhaps it's
+      // already closed).
       is_open_ = false;
+      RETURN_NOT_OK(FileClose(fd_));
     }
     return Status::OK();
   }
diff --git a/cpp/src/arrow/io/io-buffered-test.cc 
b/cpp/src/arrow/io/io-buffered-test.cc
new file mode 100644
index 0000000000..829a2d52fc
--- /dev/null
+++ b/cpp/src/arrow/io/io-buffered-test.cc
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+#include <random>
+#include <string>
+#include <valarray>
+
+#include <gtest/gtest.h>
+
+#include "arrow/io/buffered.h"
+#include "arrow/io/file.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/io/test-common.h"
+#include "arrow/status.h"
+#include "arrow/test-util.h"
+
+namespace arrow {
+namespace io {
+
+static std::string GenerateRandomData(size_t nbytes) {
+  // MSVC doesn't accept uint8_t for std::independent_bits_engine<>
+  typedef unsigned long UInt;  // NOLINT
+  std::independent_bits_engine<std::default_random_engine, 8 * sizeof(UInt), 
UInt> engine;
+
+  std::vector<UInt> data(nbytes / sizeof(UInt) + 1);
+  std::generate(begin(data), end(data), std::ref(engine));
+  return std::string(reinterpret_cast<char*>(data.data()), nbytes);
+}
+
+class FileTestFixture : public ::testing::Test {
+ public:
+  void SetUp() {
+    path_ = "arrow-test-io-buffered-output-stream.txt";
+    EnsureFileDeleted();
+  }
+
+  void TearDown() { EnsureFileDeleted(); }
+
+  void EnsureFileDeleted() {
+    if (FileExists(path_)) {
+      std::remove(path_.c_str());
+    }
+  }
+
+ protected:
+  std::string path_;
+};
+
+// ----------------------------------------------------------------------
+// File output tests
+
+class TestBufferedOutputStream : public FileTestFixture {
+ public:
+  void OpenBuffered(bool append = false) {
+    stream_.reset();
+    std::shared_ptr<FileOutputStream> file;
+    ASSERT_OK(FileOutputStream::Open(path_, append, &file));
+    fd_ = file->file_descriptor();
+    if (append) {
+    // Workaround for ARROW-2466 ("append" flag doesn't set file pos)
+#if defined(_MSC_VER)
+      _lseeki64(fd_, 0, SEEK_END);
+#else
+      lseek(fd_, 0, SEEK_END);
+#endif
+    }
+    stream_ = std::make_shared<BufferedOutputStream>(std::move(file));
+  }
+
+  void WriteChunkwise(const std::string& datastr, const 
std::valarray<int64_t>& sizes) {
+    const char* data = datastr.data();
+    const int64_t data_size = static_cast<int64_t>(datastr.size());
+    int64_t data_pos = 0;
+    auto size_it = std::begin(sizes);
+
+    // Write datastr, chunk by chunk, until exhausted
+    while (true) {
+      int64_t size = *size_it++;
+      if (size_it == std::end(sizes)) {
+        size_it = std::begin(sizes);
+      }
+      if (data_pos + size > data_size) {
+        break;
+      }
+      ASSERT_OK(stream_->Write(data + data_pos, size));
+      data_pos += size;
+    }
+    ASSERT_OK(stream_->Write(data + data_pos, data_size - data_pos));
+  }
+
+  void AssertTell(int64_t expected) {
+    int64_t actual;
+    ASSERT_OK(stream_->Tell(&actual));
+    ASSERT_EQ(expected, actual);
+  }
+
+ protected:
+  int fd_;
+  std::shared_ptr<OutputStream> stream_;
+};
+
+TEST_F(TestBufferedOutputStream, DestructorClosesFile) {
+  OpenBuffered();
+  ASSERT_FALSE(FileIsClosed(fd_));
+  stream_.reset();
+  ASSERT_TRUE(FileIsClosed(fd_));
+}
+
+TEST_F(TestBufferedOutputStream, ExplicitCloseClosesFile) {
+  OpenBuffered();
+  ASSERT_FALSE(FileIsClosed(fd_));
+  ASSERT_OK(stream_->Close());
+  ASSERT_TRUE(FileIsClosed(fd_));
+  // Idempotency
+  ASSERT_OK(stream_->Close());
+  ASSERT_TRUE(FileIsClosed(fd_));
+}
+
+TEST_F(TestBufferedOutputStream, InvalidWrites) {
+  OpenBuffered();
+
+  const char* data = "";
+  ASSERT_RAISES(Invalid, stream_->Write(data, -1));
+}
+
+TEST_F(TestBufferedOutputStream, TinyWrites) {
+  OpenBuffered();
+
+  const std::string datastr = "1234568790";
+  const char* data = datastr.data();
+
+  ASSERT_OK(stream_->Write(data, 2));
+  ASSERT_OK(stream_->Write(data + 2, 6));
+  ASSERT_OK(stream_->Close());
+
+  AssertFileContents(path_, datastr.substr(0, 8));
+}
+
+TEST_F(TestBufferedOutputStream, SmallWrites) {
+  OpenBuffered();
+
+  // Data here should be larger than BufferedOutputStream's buffer size
+  const std::string data = GenerateRandomData(200000);
+  const std::valarray<int64_t> sizes = {1, 1, 2, 3, 5, 8, 13};
+
+  WriteChunkwise(data, sizes);
+  ASSERT_OK(stream_->Close());
+
+  AssertFileContents(path_, data);
+}
+
+TEST_F(TestBufferedOutputStream, MixedWrites) {
+  OpenBuffered();
+
+  const std::string data = GenerateRandomData(300000);
+  const std::valarray<int64_t> sizes = {1, 1, 2, 3, 70000};
+
+  WriteChunkwise(data, sizes);
+  ASSERT_OK(stream_->Close());
+
+  AssertFileContents(path_, data);
+}
+
+TEST_F(TestBufferedOutputStream, LargeWrites) {
+  OpenBuffered();
+
+  const std::string data = GenerateRandomData(800000);
+  const std::valarray<int64_t> sizes = {10000, 60000, 70000};
+
+  WriteChunkwise(data, sizes);
+  ASSERT_OK(stream_->Close());
+
+  AssertFileContents(path_, data);
+}
+
+TEST_F(TestBufferedOutputStream, Tell) {
+  OpenBuffered();
+
+  AssertTell(0);
+  AssertTell(0);
+  WriteChunkwise(std::string(100, 'x'), {1, 1, 2, 3, 5, 8});
+  AssertTell(100);
+  WriteChunkwise(std::string(100000, 'x'), {60000});
+  AssertTell(100100);
+
+  ASSERT_OK(stream_->Close());
+
+  OpenBuffered(true /* append */);
+  AssertTell(100100);
+  WriteChunkwise(std::string(90, 'x'), {1, 1, 2, 3, 5, 8});
+  AssertTell(100190);
+
+  ASSERT_OK(stream_->Close());
+
+  OpenBuffered();
+  AssertTell(0);
+}
+
+TEST_F(TestBufferedOutputStream, TruncatesFile) {
+  OpenBuffered();
+
+  const std::string datastr = "1234568790";
+  ASSERT_OK(stream_->Write(datastr.data(), datastr.size()));
+  ASSERT_OK(stream_->Close());
+
+  AssertFileContents(path_, datastr);
+
+  OpenBuffered();
+  AssertFileContents(path_, "");
+}
+
+}  // namespace io
+}  // namespace arrow
diff --git a/cpp/src/arrow/io/io-file-benchmark.cc 
b/cpp/src/arrow/io/io-file-benchmark.cc
new file mode 100644
index 0000000000..bb35b080d1
--- /dev/null
+++ b/cpp/src/arrow/io/io-file-benchmark.cc
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/api.h"
+#include "arrow/io/buffered.h"
+#include "arrow/io/file.h"
+#include "arrow/test-util.h"
+
+#include "benchmark/benchmark.h"
+
+#include <algorithm>
+#include <iostream>
+#include <valarray>
+
+namespace arrow {
+
+// XXX Writing to /dev/null is irrealistic as the kernel likely doesn't
+// copy the data at all.  Use a socketpair instead?
+std::string GetNullFile() { return "/dev/null"; }
+
+const std::valarray<int64_t> small_sizes = {8, 24, 33, 1, 32, 192, 16, 40};
+const std::valarray<int64_t> large_sizes = {8192, 100000};
+
+static void BenchmarkStreamingWrites(benchmark::State& state,
+                                     std::valarray<int64_t> sizes,
+                                     io::OutputStream* stream) {
+  const std::string datastr(*std::max_element(std::begin(sizes), 
std::end(sizes)), 'x');
+  const void* data = datastr.data();
+  const int64_t sum_sizes = sizes.sum();
+
+  while (state.KeepRunning()) {
+    for (const int64_t size : sizes) {
+      ABORT_NOT_OK(stream->Write(data, size));
+    }
+  }
+  state.SetBytesProcessed(int64_t(state.iterations()) * sum_sizes);
+}
+
+static void BM_FileOutputStreamSmallWrites(
+    benchmark::State& state) {  // NOLINT non-const reference
+  std::shared_ptr<io::OutputStream> stream;
+  ABORT_NOT_OK(io::FileOutputStream::Open(GetNullFile(), &stream));
+
+  BenchmarkStreamingWrites(state, small_sizes, stream.get());
+}
+
+static void BM_FileOutputStreamLargeWrites(
+    benchmark::State& state) {  // NOLINT non-const reference
+  std::shared_ptr<io::OutputStream> stream;
+  ABORT_NOT_OK(io::FileOutputStream::Open(GetNullFile(), &stream));
+
+  BenchmarkStreamingWrites(state, large_sizes, stream.get());
+}
+
+static void BM_BufferedOutputStreamSmallWrites(
+    benchmark::State& state) {  // NOLINT non-const reference
+  std::shared_ptr<io::OutputStream> stream;
+  ABORT_NOT_OK(io::FileOutputStream::Open(GetNullFile(), &stream));
+  stream = std::make_shared<io::BufferedOutputStream>(std::move(stream));
+
+  BenchmarkStreamingWrites(state, small_sizes, stream.get());
+}
+
+static void BM_BufferedOutputStreamLargeWrites(
+    benchmark::State& state) {  // NOLINT non-const reference
+  std::shared_ptr<io::OutputStream> stream;
+  ABORT_NOT_OK(io::FileOutputStream::Open(GetNullFile(), &stream));
+  stream = std::make_shared<io::BufferedOutputStream>(std::move(stream));
+
+  BenchmarkStreamingWrites(state, large_sizes, stream.get());
+}
+
+BENCHMARK(BM_FileOutputStreamSmallWrites)->Repetitions(2)->MinTime(1.0);
+
+BENCHMARK(BM_FileOutputStreamLargeWrites)->Repetitions(2)->MinTime(1.0);
+
+BENCHMARK(BM_BufferedOutputStreamSmallWrites)->Repetitions(2)->MinTime(1.0);
+
+BENCHMARK(BM_BufferedOutputStreamLargeWrites)->Repetitions(2)->MinTime(1.0);
+
+}  // namespace arrow
diff --git a/cpp/src/arrow/io/io-file-test.cc b/cpp/src/arrow/io/io-file-test.cc
index 53218ca857..098e82f437 100644
--- a/cpp/src/arrow/io/io-file-test.cc
+++ b/cpp/src/arrow/io/io-file-test.cc
@@ -21,17 +21,12 @@
 #include <cstdio>
 #include <cstdlib>
 #include <cstring>
-#include <fstream>  // IWYU pragma: keep
 #include <memory>
 #include <sstream>  // IWYU pragma: keep
 #include <string>
 #include <thread>
 #include <vector>
 
-#ifndef _MSC_VER
-#include <fcntl.h>
-#endif
-
 #include <gtest/gtest.h>
 
 #include "arrow/buffer.h"
@@ -45,38 +40,6 @@
 namespace arrow {
 namespace io {
 
-static bool FileExists(const std::string& path) {
-  return std::ifstream(path.c_str()).good();
-}
-
-#if defined(_MSC_VER)
-void InvalidParamHandler(const wchar_t* expr, const wchar_t* func,
-                         const wchar_t* source_file, unsigned int source_line,
-                         uintptr_t reserved) {
-  wprintf(L"Invalid parameter in function %s. Source: %s line %d expression 
%s", func,
-          source_file, source_line, expr);
-}
-#endif
-
-static bool FileIsClosed(int fd) {
-#if defined(_MSC_VER)
-  // Disables default behavior on wrong params which causes the application to 
crash
-  // https://msdn.microsoft.com/en-us/library/ksazx244.aspx
-  _set_invalid_parameter_handler(InvalidParamHandler);
-
-  // Disables possible assertion alert box on invalid input arguments
-  _CrtSetReportMode(_CRT_ASSERT, 0);
-
-  int ret = static_cast<int>(_close(fd));
-  return (ret == -1);
-#else
-  if (-1 != fcntl(fd, F_GETFD)) {
-    return false;
-  }
-  return errno == EBADF;
-#endif
-}
-
 class FileTestFixture : public ::testing::Test {
  public:
   void SetUp() {
@@ -155,12 +118,7 @@ TEST_F(TestFileOutputStream, Close) {
   // Idempotent
   ASSERT_OK(file_->Close());
 
-  std::shared_ptr<ReadableFile> rd_file;
-  ASSERT_OK(ReadableFile::Open(path_, &rd_file));
-
-  int64_t size = 0;
-  ASSERT_OK(rd_file->GetSize(&size));
-  ASSERT_EQ(strlen(data), size);
+  AssertFileContents(path_, data);
 
   ASSERT_OK(stream_->Write(data, strlen(data)));
 
@@ -171,9 +129,7 @@ TEST_F(TestFileOutputStream, Close) {
   // Idempotent
   ASSERT_OK(stream_->Close());
 
-  ASSERT_OK(ReadableFile::Open(path_, &rd_file));
-  ASSERT_OK(rd_file->GetSize(&size));
-  ASSERT_EQ(strlen(data), size);
+  AssertFileContents(path_, data);
 }
 
 TEST_F(TestFileOutputStream, InvalidWrites) {
@@ -215,12 +171,7 @@ TEST_F(TestFileOutputStream, TruncatesNewFile) {
   ASSERT_OK(FileOutputStream::Open(path_, &file_));
   ASSERT_OK(file_->Close());
 
-  std::shared_ptr<ReadableFile> rd_file;
-  ASSERT_OK(ReadableFile::Open(path_, &rd_file));
-
-  int64_t size;
-  ASSERT_OK(rd_file->GetSize(&size));
-  ASSERT_EQ(0, size);
+  AssertFileContents(path_, "");
 
   ASSERT_OK(FileOutputStream::Open(path_, &stream_));
 
@@ -230,9 +181,7 @@ TEST_F(TestFileOutputStream, TruncatesNewFile) {
   ASSERT_OK(FileOutputStream::Open(path_, &stream_));
   ASSERT_OK(stream_->Close());
 
-  ASSERT_OK(ReadableFile::Open(path_, &rd_file));
-  ASSERT_OK(rd_file->GetSize(&size));
-  ASSERT_EQ(0, size);
+  AssertFileContents(path_, "");
 }
 
 // ----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/test-common.h b/cpp/src/arrow/io/test-common.h
index a4974b7752..fa9145259b 100644
--- a/cpp/src/arrow/io/test-common.h
+++ b/cpp/src/arrow/io/test-common.h
@@ -20,10 +20,15 @@
 
 #include <algorithm>
 #include <cstdint>
+#include <fstream>  // IWYU pragma: keep
 #include <memory>
 #include <string>
 #include <vector>
 
+#ifndef _MSC_VER
+#include <fcntl.h>
+#endif
+
 #if defined(__MINGW32__)  // MinGW
 // nothing
 #elif defined(_MSC_VER)  // Visual Studio
@@ -41,6 +46,56 @@
 namespace arrow {
 namespace io {
 
+static inline void AssertFileContents(const std::string& path,
+                                      const std::string& contents) {
+  std::shared_ptr<ReadableFile> rf;
+  int64_t size;
+
+  ASSERT_OK(ReadableFile::Open(path, &rf));
+  ASSERT_OK(rf->GetSize(&size));
+  ASSERT_EQ(size, contents.size());
+
+  std::shared_ptr<Buffer> actual_data;
+  ASSERT_OK(rf->Read(size, &actual_data));
+  ASSERT_TRUE(actual_data->Equals(Buffer(contents)));
+}
+
+static inline bool FileExists(const std::string& path) {
+  return std::ifstream(path.c_str()).good();
+}
+
+#if defined(_MSC_VER)
+static inline void InvalidParamHandler(const wchar_t* expr, const wchar_t* 
func,
+                                       const wchar_t* source_file,
+                                       unsigned int source_line, uintptr_t 
reserved) {
+  wprintf(L"Invalid parameter in function '%s'. Source: '%s' line %d 
expression '%s'\n",
+          func, source_file, source_line, expr);
+}
+#endif
+
+static inline bool FileIsClosed(int fd) {
+#if defined(_MSC_VER)
+  // Disables default behavior on wrong params which causes the application to 
crash
+  // https://msdn.microsoft.com/en-us/library/ksazx244.aspx
+  _set_invalid_parameter_handler(InvalidParamHandler);
+
+  // Disables possible assertion alert box on invalid input arguments
+  _CrtSetReportMode(_CRT_ASSERT, 0);
+
+  int new_fd = _dup(fd);
+  if (new_fd == -1) {
+    return errno == EBADF;
+  }
+  _close(new_fd);
+  return false;
+#else
+  if (-1 != fcntl(fd, F_GETFD)) {
+    return false;
+  }
+  return errno == EBADF;
+#endif
+}
+
 static inline Status ZeroMemoryMap(MemoryMappedFile* file) {
   constexpr int64_t kBufferSize = 512;
   static constexpr uint8_t kZeroBytes[kBufferSize] = {0};


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> [C++] Add buffered output class implementing OutputStream interface
> -------------------------------------------------------------------
>
>                 Key: ARROW-2319
>                 URL: https://issues.apache.org/jira/browse/ARROW-2319
>             Project: Apache Arrow
>          Issue Type: New Feature
>          Components: C++
>            Reporter: Wes McKinney
>            Assignee: Antoine Pitrou
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 0.10.0
>
>
> This purpose of this is to throttle smaller writes to the actual underlying 
> {{OutputStream}} interface, which might be a file or network protocol. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to