kou commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1416847365
##########
http/examples/get/client.cpp:
##########
@@ -0,0 +1,86 @@
+#include <curl/curl.h>
+#include <arrow/api.h>
+#include <arrow/io/api.h>
+#include <arrow/ipc/api.h>
+#include <chrono>
+
+struct MemoryStruct {
+ char *memory;
+ size_t size;
+};
+
+static size_t
+WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp)
+{
+ size_t realsize = size * nmemb;
+ struct MemoryStruct *mem = (struct MemoryStruct *)userp;
+
+ char *ptr = static_cast<char*>(realloc(mem->memory, mem->size + realsize +
1));
+ if(!ptr) {
+ printf("out of memory\n");
+ return 0;
+ }
+
+ mem->memory = ptr;
+ memcpy(&(mem->memory[mem->size]), contents, realsize);
+ mem->size += realsize;
+ mem->memory[mem->size] = 0;
+
+ return realsize;
+}
+
+int main(void)
+{
+ std::string url = "http://localhost:8000";
+
+ CURL *curl_handle;
+ CURLcode res;
+
+ struct MemoryStruct chunk;
+
+ chunk.memory = static_cast<char*>(malloc(1));
+ chunk.size = 0;
+
+ curl_global_init(CURL_GLOBAL_ALL);
+ curl_handle = curl_easy_init();
+
+ curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
+ curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
+ curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *)&chunk);
+
+ auto start_time = std::chrono::steady_clock::now();
+
+ res = curl_easy_perform(curl_handle);
+
+ printf("%lu bytes received\n", (unsigned long)chunk.size);
+
+ auto buffer = arrow::Buffer::Wrap(reinterpret_cast<const uint8_t
*>(chunk.memory), chunk.size);
+ auto input_stream = std::make_shared<arrow::io::BufferReader>(buffer);
+ auto record_batch_reader =
arrow::ipc::RecordBatchStreamReader::Open(input_stream).ValueOrDie();
+
+ std::vector<std::shared_ptr<arrow::RecordBatch>> record_batches;
+ std::shared_ptr<arrow::RecordBatch> record_batch;
+ while (record_batch_reader->ReadNext(&record_batch).ok() && record_batch)
+ {
+ record_batches.push_back(record_batch);
+ }
+
+ printf("%lu record batches received\n", (unsigned
long)(record_batches.size()));
+
+ auto end_time = std::chrono::steady_clock::now();
+
+ auto time_duration =
std::chrono::duration_cast<std::chrono::duration<double>>(end_time -
start_time);
+ printf("%.2f seconds elapsed\n", time_duration.count());
+
+ curl_easy_cleanup(curl_handle);
+
+ free(chunk.memory);
+
+ curl_global_cleanup();
+
+ return 0;
+}
+
+// to compile (for example):
+//clang++ client.cpp -std=c++17 -I/usr/local/include -L/usr/local/lib -lcurl
-larrow -o client
Review Comment:
We can use `pkg-config`:
```suggestion
//clang++ client.cpp -std=c++17 $(pkg-config --cflags --libs arrow libcurl)
-o client
```
##########
http/examples/get/client.cpp:
##########
Review Comment:
We can use `arrow::ipc::StreamDecoder` for this use case. It provides "push"
style API. (`arrow::ipc::RecordBatchStreamReader` provides "pull" style API.")
```cpp
#include <curl/curl.h>
#include <arrow/api.h>
#include <arrow/io/api.h>
#include <arrow/ipc/api.h>
#include <chrono>
static size_t
WriteFunction(void *contents, size_t size, size_t nmemb, void *userp)
{
size_t real_size = size * nmemb;
auto decoder = static_cast<arrow::ipc::StreamDecoder*>(userp);
if (decoder->Consume(static_cast<const uint8_t*>(contents),
real_size).ok()) {
return real_size;
} else {
return 0;
}
}
int main(void)
{
std::string url = "http://localhost:8000";
CURL *curl_handle;
CURLcode res;
// We use arrow::ipc::CollectListner() here for easy to understand
// but we can process decoded record batches as a stream by
// overriding arrow::ipc::Listener().
auto collect_listener = std::make_shared<arrow::ipc::CollectListener>();
arrow::ipc::StreamDecoder decoder(collect_listener);
curl_global_init(CURL_GLOBAL_ALL);
curl_handle = curl_easy_init();
curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, WriteFunction);
curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, &decoder);
auto start_time = std::chrono::steady_clock::now();
res = curl_easy_perform(curl_handle);
printf("%ld record batches received\n",
collect_listener->num_record_batches());
auto end_time = std::chrono::steady_clock::now();
auto time_duration =
std::chrono::duration_cast<std::chrono::duration<double>>(end_time -
start_time);
printf("%.2f seconds elapsed\n", time_duration.count());
curl_easy_cleanup(curl_handle);
curl_global_cleanup();
return 0;
}
// to compile (for example):
//clang++ client.cpp -std=c++17 -I/usr/local/include -L/usr/local/lib -lcurl
-larrow -o client
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]