westonpace commented on code in PR #13669:
URL: https://github.com/apache/arrow/pull/13669#discussion_r926657650


##########
cpp/src/arrow/memory_pool_internal.h:
##########
@@ -26,7 +26,7 @@ namespace memory_pool {
 
 namespace internal {
 
-static constexpr size_t kAlignment = 64;
+static constexpr size_t kAlignment = 512;

Review Comment:
   This is not going to work.  The impact would be too large.  Is this because 
you are using direct I/O below?  If so,we could potentially make the alignment 
configurable and create a custom memory pool and require users that want direct 
I/O to use the custom memory pool.



##########
cpp/src/arrow/compute/exec/spilling_util.cc:
##########
@@ -0,0 +1,253 @@
+#include "spilling_util.h"
+
+namespace arrow
+{
+namespace compute
+{
+
+    struct ArrayInfo
+    {
+        int64_t num_children;
+        std::array<std::shared_ptr<Buffer>, 3> bufs;
+        std::array<size_t, 3> sizes;
+        bool has_dict;
+    };
+
+    struct SpillFile::BatchInfo
+    {
+        int64_t start;
+        std::vector<ArrayInfo> arrays;
+    };
+
+#ifdef _WIN32
+#include "windows_compatibility.h"
+
+const FileHandle kInvalidHandle = INVALID_HANDLE_VALUE;
+
+static Result<FileHandle> OpenTemporaryFile()

Review Comment:
   We already have some temporary file handling utilities in util/io_util.cc.  
Is there a reason you couldn't reuse those?  Also, should this be a temporary 
directory anyway?  It seems like forcing users to configure and specify a 
spilling directory might help them to be more aware of what is going on.



##########
cpp/src/arrow/compute/exec/spilling_util.cc:
##########
@@ -0,0 +1,253 @@
+#include "spilling_util.h"
+
+namespace arrow
+{
+namespace compute
+{
+
+    struct ArrayInfo
+    {
+        int64_t num_children;
+        std::array<std::shared_ptr<Buffer>, 3> bufs;
+        std::array<size_t, 3> sizes;
+        bool has_dict;
+    };
+
+    struct SpillFile::BatchInfo
+    {
+        int64_t start;
+        std::vector<ArrayInfo> arrays;
+    };
+
+#ifdef _WIN32
+#include "windows_compatibility.h"
+
+const FileHandle kInvalidHandle = INVALID_HANDLE_VALUE;
+
+static Result<FileHandle> OpenTemporaryFile()
+{
+    constexpr DWORD kTempFileNameSize = MAX_PATH + 1;
+    wchar_t tmp_name_buf[kTempFileNameSize];
+    wchar_t tmp_path_buf[kTempFileNameSize];
+
+    DWORD ret;
+    ret = GetTempPath2W(kTempFileNameSize, tmp_path_buf);
+    if(ret > kTempFileNameSize || ret == 0)
+        return Status::IOError();
+    if(GetTempFileNameW(tmp_path_buf, L"ARROW_TMP", 0, tmp_name_buf) == 0)
+        return Status::IOError();
+
+    HANDLE file_handle = CreateFileA(
+        tmp_name_buf,
+        GENERIC_READ | GENERIC_WRITE | FILE_APPEND_DATA,
+        0,
+        NULL,
+        CREATE_ALWAYS,
+        FILE_FLAG_NO_BUFFERING | FILE_FLAG_OVERLAPPED | 
FILE_FLAG_DELETE_ON_CLOSE,
+        NULL);
+    if(file_handle == INVALID_HANDLE_VALUE)
+        return Status::IOError("Failed to create temp file");
+    return file_handle;
+}
+
+static Status CloseTemporaryFile(FileHandle handle)
+{
+    if(!CloseHandle(handle))
+        return Status::IOError("Failed to close temp file");
+    return Status::OK();
+}
+
+static Status WriteBatch_PlatformSpecific(FileHandle handle, const 
SpillFile::BatchInfo &info)
+{
+    OVERLAPPED overlapped;
+    int64_t offset = info.start;
+    for(const ArrayInfo &arr : info.arrays)
+    {
+        for(int i = 0; i < 3; i++)
+        {
+            if(info.bufs[i] != 0)
+            {
+                overlapped.Offset = static_cast<DWORD>(offset);
+                overlapped.OffsetHigh = static_cast<DWORD>(offset >> 32);
+                if(!WriteFile(
+                       handle,
+                       info.bufs[i]->data(),
+                       info.bufs[i]->size(),
+                       NULL,
+                       &overlapped))
+                    return Status::IOError("Failed to spill!");
+                offset += info.sizes[i];
+            }
+        }
+    }
+    return Status::OK();
+}
+#else
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <unistd.h>
+#include <fcntl.h>
+
+Result<FileHandle> OpenTemporaryFile()
+{
+    const char *selectors[] = { "TMPDIR", "TMP", "TEMP", "TEMPDIR" };
+    constexpr size_t kNumSelectors = sizeof(selectors) / sizeof(selectors[0]);
+#ifdef __ANDROID__
+    const char *backup = "/data/local/tmp/";
+#else
+    const char *backup = "/tmp/";
+#endif
+    const char *tmp_dir = backup;
+    for(size_t i = 0; i < kNumSelectors; i++)
+    {
+        const char *env = getenv(selectors[i]);
+        if(env)
+        {
+            tmp_dir = env;
+            break;
+        }
+    }
+    size_t tmp_dir_length = std::strlen(tmp_dir);
+
+    const char *tmp_name_template = "/ARROW_TMP_XXXXXX";
+    size_t tmp_name_length = std::strlen(tmp_name_template);
+
+    constexpr int kFileNameSize = 1024;
+
+    if((tmp_dir_length + tmp_name_length) >= kFileNameSize)
+    {
+        tmp_dir = backup;
+        tmp_dir_length = std::strlen(backup);
+    }
+
+    char name[kFileNameSize + 1] = {};
+    std::strncpy(name, tmp_dir, kFileNameSize);
+    std::strncpy(name + tmp_dir_length, tmp_name_template, kFileNameSize - 
tmp_dir_length);
+
+#ifdef __APPLE__
+    int fd = mkstemp(name);
+    if(fd == kInvalidHandle)
+        return Status::IOError(strerror(errno));
+    if(fcntl(fd, F_NOCACHE, 1) == -1)
+        return Status::IOError(strerror(errno));
+#else    
+    int fd = mkostemp(name, O_DIRECT);

Review Comment:
   There was some discussion of direct I/O in #13640.  I think there are 
concerns and it should at least be opt-in until we have a better picture of 
real world performance.  Especially considering the alignment requirements.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to