[
https://issues.apache.org/jira/browse/PARQUET-1092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16294087#comment-16294087
]
ASF GitHub Bot commented on PARQUET-1092:
-----------------------------------------
xhochy closed pull request #426: PARQUET-1092: Support writing chunked
arrow::Table columns
URL: https://github.com/apache/parquet-cpp/pull/426
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc
b/src/parquet/arrow/arrow-reader-writer-test.cc
index 02f37517..db12fb45 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -24,7 +24,10 @@
#include "gtest/gtest.h"
#include <arrow/compute/api.h>
+#include <cstdint>
+#include <functional>
#include <sstream>
+#include <vector>
#include "parquet/api/reader.h"
#include "parquet/api/writer.h"
@@ -38,6 +41,7 @@
#include "arrow/api.h"
#include "arrow/test-util.h"
+#include "arrow/type_traits.h"
#include "arrow/util/decimal.h"
using arrow::Array;
@@ -45,6 +49,7 @@ using arrow::ArrayVisitor;
using arrow::Buffer;
using arrow::ChunkedArray;
using arrow::Column;
+using arrow::DataType;
using arrow::ListArray;
using arrow::PoolBuffer;
using arrow::PrimitiveArray;
@@ -77,7 +82,7 @@ static constexpr int LARGE_SIZE = 10000;
static constexpr uint32_t kDefaultSeed = 0;
-LogicalType::type get_logical_type(const ::arrow::DataType& type) {
+LogicalType::type get_logical_type(const ::DataType& type) {
switch (type.id()) {
case ArrowId::UINT8:
return LogicalType::UINT_8;
@@ -130,7 +135,7 @@ LogicalType::type get_logical_type(const ::arrow::DataType&
type) {
return LogicalType::NONE;
}
-ParquetType::type get_physical_type(const ::arrow::DataType& type) {
+ParquetType::type get_physical_type(const ::DataType& type) {
switch (type.id()) {
case ArrowId::BOOL:
return ParquetType::BOOLEAN;
@@ -325,6 +330,73 @@ void WriteTableToBuffer(const std::shared_ptr<Table>&
table, int num_threads,
*out = sink->GetBuffer();
}
+namespace internal {
+
+void AssertArraysEqual(const Array& expected, const Array& actual) {
+ if (!actual.Equals(expected)) {
+ std::stringstream pp_result;
+ std::stringstream pp_expected;
+
+ EXPECT_OK(::arrow::PrettyPrint(actual, 0, &pp_result));
+ EXPECT_OK(::arrow::PrettyPrint(expected, 0, &pp_expected));
+ FAIL() << "Got: \n" << pp_result.str() << "\nExpected: \n" <<
pp_expected.str();
+ }
+}
+
+} // namespace internal
+
+void AssertChunkedEqual(const ChunkedArray& expected, const ChunkedArray&
actual) {
+ ASSERT_EQ(expected.num_chunks(), actual.num_chunks()) << "# chunks unequal";
+ if (!actual.Equals(expected)) {
+ std::stringstream pp_result;
+ std::stringstream pp_expected;
+
+ for (int i = 0; i < actual.num_chunks(); ++i) {
+ auto c1 = actual.chunk(i);
+ auto c2 = expected.chunk(i);
+ if (!c1->Equals(*c2)) {
+ EXPECT_OK(::arrow::PrettyPrint(*c1, 0, &pp_result));
+ EXPECT_OK(::arrow::PrettyPrint(*c2, 0, &pp_expected));
+ FAIL() << "Chunk " << i << " Got: " << pp_result.str()
+ << "\nExpected: " << pp_expected.str();
+ }
+ }
+ }
+}
+
+void PrintColumn(const Column& col, std::stringstream* ss) {
+ const ChunkedArray& carr = *col.data();
+ for (int i = 0; i < carr.num_chunks(); ++i) {
+ auto c1 = carr.chunk(i);
+ *ss << "Chunk " << i << std::endl;
+ EXPECT_OK(::arrow::PrettyPrint(*c1, 0, ss));
+ *ss << std::endl;
+ }
+}
+
+void AssertTablesEqual(const Table& expected, const Table& actual,
+ bool same_chunk_layout = true) {
+ ASSERT_EQ(expected.num_columns(), actual.num_columns());
+
+ if (same_chunk_layout) {
+ for (int i = 0; i < actual.num_columns(); ++i) {
+ AssertChunkedEqual(*expected.column(i)->data(),
*actual.column(i)->data());
+ }
+ } else {
+ std::stringstream ss;
+ if (!actual.Equals(expected)) {
+ for (int i = 0; i < expected.num_columns(); ++i) {
+ ss << "Actual column " << i << std::endl;
+ PrintColumn(*actual.column(i), &ss);
+
+ ss << "Expected column " << i << std::endl;
+ PrintColumn(*expected.column(i), &ss);
+ }
+ FAIL() << ss.str();
+ }
+ }
+}
+
void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, int num_threads,
int64_t row_group_size, const std::vector<int>&
column_subset,
std::shared_ptr<Table>* out,
@@ -348,7 +420,15 @@ void DoSimpleRoundtrip(const std::shared_ptr<Table>&
table, int num_threads,
}
}
-static std::shared_ptr<GroupNode> MakeSimpleSchema(const ::arrow::DataType&
type,
+void CheckSimpleRoundtrip(const std::shared_ptr<Table>& table, int64_t
row_group_size,
+ const std::shared_ptr<ArrowWriterProperties>&
arrow_properties =
+ default_arrow_writer_properties()) {
+ std::shared_ptr<Table> result;
+ DoSimpleRoundtrip(table, 1, row_group_size, {}, &result, arrow_properties);
+ AssertTablesEqual(*table, *result, false);
+}
+
+static std::shared_ptr<GroupNode> MakeSimpleSchema(const ::DataType& type,
Repetition::type
repetition) {
int32_t byte_width = -1;
int32_t precision = -1;
@@ -357,7 +437,7 @@ static std::shared_ptr<GroupNode> MakeSimpleSchema(const
::arrow::DataType& type
switch (type.id()) {
case ::arrow::Type::DICTIONARY: {
const auto& dict_type = static_cast<const
::arrow::DictionaryType&>(type);
- const ::arrow::DataType& values_type = *dict_type.dictionary()->type();
+ const ::DataType& values_type = *dict_type.dictionary()->type();
switch (values_type.id()) {
case ::arrow::Type::FIXED_SIZE_BINARY:
byte_width =
@@ -393,49 +473,6 @@ static std::shared_ptr<GroupNode> MakeSimpleSchema(const
::arrow::DataType& type
return std::static_pointer_cast<GroupNode>(node_);
}
-namespace internal {
-
-void AssertArraysEqual(const Array& expected, const Array& actual) {
- if (!actual.Equals(expected)) {
- std::stringstream pp_result;
- std::stringstream pp_expected;
-
- EXPECT_OK(::arrow::PrettyPrint(actual, 0, &pp_result));
- EXPECT_OK(::arrow::PrettyPrint(expected, 0, &pp_expected));
- FAIL() << "Got: \n" << pp_result.str() << "\nExpected: \n" <<
pp_expected.str();
- }
-}
-
-} // namespace internal
-
-void AssertChunkedEqual(const ChunkedArray& expected, const ChunkedArray&
actual) {
- ASSERT_EQ(expected.num_chunks(), actual.num_chunks()) << "# chunks unequal";
- if (!actual.Equals(expected)) {
- std::stringstream pp_result;
- std::stringstream pp_expected;
-
- for (int i = 0; i < actual.num_chunks(); ++i) {
- auto c1 = actual.chunk(i);
- auto c2 = expected.chunk(i);
- if (!c1->Equals(*c2)) {
- EXPECT_OK(::arrow::PrettyPrint(*c1, 0, &pp_result));
- EXPECT_OK(::arrow::PrettyPrint(*c2, 0, &pp_expected));
- FAIL() << "Chunk " << i << " Got: " << pp_result.str()
- << "\nExpected: " << pp_expected.str();
- }
- }
- }
-}
-
-void AssertTablesEqual(const Table& expected, const Table& actual) {
- ASSERT_EQ(expected.num_columns(), actual.num_columns());
-
- for (int i = 0; i < actual.num_columns(); ++i) {
- AssertChunkedEqual(*expected.column(i)->data(), *actual.column(i)->data());
- }
- ASSERT_TRUE(actual.Equals(expected));
-}
-
template <typename TestType>
class TestParquetIO : public ::testing::Test {
public:
@@ -527,10 +564,7 @@ class TestParquetIO : public ::testing::Test {
}
void CheckRoundTrip(const std::shared_ptr<Table>& table) {
- std::shared_ptr<Table> result;
- DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result);
-
- AssertTablesEqual(*table, *result);
+ CheckSimpleRoundtrip(table, table->num_rows());
}
template <typename ArrayType>
@@ -1315,32 +1349,48 @@ TEST(TestArrowReadWrite, ConvertedDateTimeTypes) {
auto f0 = field("f0", ::arrow::date64());
auto f1 = field("f1", ::arrow::time32(TimeUnit::SECOND));
- std::shared_ptr<::arrow::Schema> schema(new ::arrow::Schema({f0, f1}));
+ auto f2 = field("f2", ::arrow::date64());
+ auto f3 = field("f3", ::arrow::time32(TimeUnit::SECOND));
+
+ auto schema = ::arrow::schema({f0, f1, f2, f3});
std::vector<int64_t> a0_values = {1489190400000, 1489276800000,
1489363200000,
1489449600000, 1489536000000,
1489622400000};
std::vector<int32_t> a1_values = {0, 1, 2, 3, 4, 5};
- std::shared_ptr<Array> a0, a1, x0, x1;
+ std::shared_ptr<Array> a0, a1, a0_nonnull, a1_nonnull, x0, x1, x0_nonnull,
x1_nonnull;
+
ArrayFromVector<::arrow::Date64Type, int64_t>(f0->type(), is_valid,
a0_values, &a0);
+ ArrayFromVector<::arrow::Date64Type, int64_t>(f0->type(), a0_values,
&a0_nonnull);
+
ArrayFromVector<::arrow::Time32Type, int32_t>(f1->type(), is_valid,
a1_values, &a1);
+ ArrayFromVector<::arrow::Time32Type, int32_t>(f1->type(), a1_values,
&a1_nonnull);
std::vector<std::shared_ptr<::arrow::Column>> columns = {
- std::make_shared<Column>("f0", a0), std::make_shared<Column>("f1", a1)};
+ std::make_shared<Column>("f0", a0), std::make_shared<Column>("f1", a1),
+ std::make_shared<Column>("f2", a0_nonnull),
+ std::make_shared<Column>("f3", a1_nonnull)};
auto table = Table::Make(schema, columns);
// Expected schema and values
auto e0 = field("f0", ::arrow::date32());
auto e1 = field("f1", ::arrow::time32(TimeUnit::MILLI));
- std::shared_ptr<::arrow::Schema> ex_schema(new ::arrow::Schema({e0, e1}));
+ auto e2 = field("f2", ::arrow::date32());
+ auto e3 = field("f3", ::arrow::time32(TimeUnit::MILLI));
+ auto ex_schema = ::arrow::schema({e0, e1, e2, e3});
std::vector<int32_t> x0_values = {17236, 17237, 17238, 17239, 17240, 17241};
std::vector<int32_t> x1_values = {0, 1000, 2000, 3000, 4000, 5000};
ArrayFromVector<::arrow::Date32Type, int32_t>(e0->type(), is_valid,
x0_values, &x0);
+ ArrayFromVector<::arrow::Date32Type, int32_t>(e0->type(), x0_values,
&x0_nonnull);
+
ArrayFromVector<::arrow::Time32Type, int32_t>(e1->type(), is_valid,
x1_values, &x1);
+ ArrayFromVector<::arrow::Time32Type, int32_t>(e1->type(), x1_values,
&x1_nonnull);
std::vector<std::shared_ptr<::arrow::Column>> ex_columns = {
- std::make_shared<Column>("f0", x0), std::make_shared<Column>("f1", x1)};
+ std::make_shared<Column>("f0", x0), std::make_shared<Column>("f1", x1),
+ std::make_shared<Column>("f2", x0_nonnull),
+ std::make_shared<Column>("f3", x1_nonnull)};
auto ex_table = Table::Make(ex_schema, ex_columns);
std::shared_ptr<Table> result;
@@ -1375,6 +1425,43 @@ void MakeDoubleTable(int num_columns, int num_rows, int
nchunks,
*out = Table::Make(schema, columns);
}
+void MakeListArray(int num_rows, std::shared_ptr<::DataType>* out_type,
+ std::shared_ptr<Array>* out_array) {
+ ::arrow::Int32Builder offset_builder;
+
+ std::vector<int32_t> length_draws;
+ randint(num_rows, 0, 100, &length_draws);
+
+ std::vector<int32_t> offset_values;
+
+ // Make sure some of them are length 0
+ int32_t total_elements = 0;
+ for (size_t i = 0; i < length_draws.size(); ++i) {
+ if (length_draws[i] < 10) {
+ length_draws[i] = 0;
+ }
+ offset_values.push_back(total_elements);
+ total_elements += length_draws[i];
+ }
+ offset_values.push_back(total_elements);
+
+ std::vector<int8_t> value_draws;
+ randint(total_elements, 0, 100, &value_draws);
+
+ std::vector<bool> is_valid;
+ random_is_valid(total_elements, 0.1, &is_valid);
+
+ std::shared_ptr<Array> values, offsets;
+ ::arrow::ArrayFromVector<::arrow::Int8Type, int8_t>(::arrow::int8(),
is_valid,
+ value_draws, &values);
+ ::arrow::ArrayFromVector<::arrow::Int32Type, int32_t>(offset_values,
&offsets);
+
+ ASSERT_OK(::arrow::ListArray::FromArrays(*offsets, *values,
default_memory_pool(),
+ out_array));
+
+ *out_type = ::arrow::list(::arrow::int8());
+}
+
TEST(TestArrowReadWrite, MultithreadedRead) {
const int num_columns = 20;
const int num_rows = 1000;
@@ -1464,51 +1551,16 @@ TEST(TestArrowReadWrite, ReadColumnSubset) {
AssertTablesEqual(*expected, *result);
}
-void MakeListTable(int num_rows, std::shared_ptr<Table>* out) {
- ::arrow::Int32Builder offset_builder;
-
- std::vector<int32_t> length_draws;
- randint(num_rows, 0, 100, &length_draws);
-
- std::vector<int32_t> offset_values;
-
- // Make sure some of them are length 0
- int32_t total_elements = 0;
- for (size_t i = 0; i < length_draws.size(); ++i) {
- if (length_draws[i] < 10) {
- length_draws[i] = 0;
- }
- offset_values.push_back(total_elements);
- total_elements += length_draws[i];
- }
- offset_values.push_back(total_elements);
-
- std::vector<int8_t> value_draws;
- randint(total_elements, 0, 100, &value_draws);
-
- std::vector<bool> is_valid;
- random_is_valid(total_elements, 0.1, &is_valid);
-
- std::shared_ptr<Array> values, offsets;
- ::arrow::ArrayFromVector<::arrow::Int8Type, int8_t>(::arrow::int8(),
is_valid,
- value_draws, &values);
- ::arrow::ArrayFromVector<::arrow::Int32Type, int32_t>(offset_values,
&offsets);
+TEST(TestArrowReadWrite, ListLargeRecords) {
+ const int num_rows = 50;
std::shared_ptr<Array> list_array;
- ASSERT_OK(::arrow::ListArray::FromArrays(*offsets, *values,
default_memory_pool(),
- &list_array));
+ std::shared_ptr<::DataType> list_type;
- auto f1 = ::arrow::field("a", ::arrow::list(::arrow::int8()));
- auto schema = ::arrow::schema({f1});
- std::vector<std::shared_ptr<Array>> arrays = {list_array};
- *out = Table::Make(schema, arrays);
-}
-
-TEST(TestArrowReadWrite, ListLargeRecords) {
- const int num_rows = 50;
+ MakeListArray(num_rows, &list_type, &list_array);
- std::shared_ptr<Table> table;
- MakeListTable(num_rows, &table);
+ auto schema = ::arrow::schema({::arrow::field("a", list_type)});
+ std::shared_ptr<Table> table = Table::Make(schema, {list_array});
std::shared_ptr<Buffer> buffer;
WriteTableToBuffer(table, 1, 100, default_arrow_writer_properties(),
&buffer);
@@ -1549,6 +1601,74 @@ TEST(TestArrowReadWrite, ListLargeRecords) {
ASSERT_TRUE(table->Equals(*chunked_table));
}
+typedef std::function<void(int, std::shared_ptr<::DataType>*,
std::shared_ptr<Array>*)>
+ ArrayFactory;
+
+template <typename ArrowType>
+struct GenerateArrayFunctor {
+ explicit GenerateArrayFunctor(double pct_null = 0.1) : pct_null(pct_null) {}
+
+ void operator()(int length, std::shared_ptr<::DataType>* type,
+ std::shared_ptr<Array>* array) {
+ using T = typename ArrowType::c_type;
+
+ // TODO(wesm): generate things other than integers
+ std::vector<T> draws;
+ randint(length, 0, 100, &draws);
+
+ std::vector<bool> is_valid;
+ random_is_valid(length, this->pct_null, &is_valid);
+
+ *type = ::arrow::TypeTraits<ArrowType>::type_singleton();
+ ::arrow::ArrayFromVector<ArrowType, T>(*type, is_valid, draws, array);
+ }
+
+ double pct_null;
+};
+
+typedef std::function<void(int, std::shared_ptr<::DataType>*,
std::shared_ptr<Array>*)>
+ ArrayFactory;
+
+auto GenerateInt32 = [](int length, std::shared_ptr<::DataType>* type,
+ std::shared_ptr<Array>* array) {
+ GenerateArrayFunctor<::arrow::Int32Type> func;
+ func(length, type, array);
+};
+
+auto GenerateList = [](int length, std::shared_ptr<::DataType>* type,
+ std::shared_ptr<Array>* array) {
+ MakeListArray(length, type, array);
+};
+
+TEST(TestArrowReadWrite, TableWithChunkedColumns) {
+ std::vector<ArrayFactory> functions = {GenerateInt32, GenerateList};
+
+ std::vector<int> chunk_sizes = {2, 4, 10, 2};
+ const int64_t total_length = 18;
+
+ for (const auto& datagen_func : functions) {
+ ::arrow::ArrayVector arrays;
+ std::shared_ptr<Array> arr;
+ std::shared_ptr<::DataType> type;
+ datagen_func(total_length, &type, &arr);
+
+ int64_t offset = 0;
+ for (int chunk_size : chunk_sizes) {
+ arrays.push_back(arr->Slice(offset, chunk_size));
+ offset += chunk_size;
+ }
+
+ auto field = ::arrow::field("fname", type);
+ auto schema = ::arrow::schema({field});
+ auto col = std::make_shared<::arrow::Column>(field, arrays);
+ auto table = Table::Make(schema, {col});
+
+ CheckSimpleRoundtrip(table, 2);
+ CheckSimpleRoundtrip(table, 3);
+ CheckSimpleRoundtrip(table, 10);
+ }
+}
+
TEST(TestArrowWrite, CheckChunkSize) {
const int num_columns = 2;
const int num_rows = 128;
@@ -1943,13 +2063,13 @@ TEST(TestArrowReaderAdHoc, Int96BadMemoryAccess) {
class TestArrowReaderAdHocSpark
: public ::testing::TestWithParam<
- std::tuple<std::string, std::shared_ptr<::arrow::DataType>>> {};
+ std::tuple<std::string, std::shared_ptr<::DataType>>> {};
TEST_P(TestArrowReaderAdHocSpark, ReadDecimals) {
std::string path(std::getenv("PARQUET_TEST_DATA"));
std::string filename;
- std::shared_ptr<::arrow::DataType> decimal_type;
+ std::shared_ptr<::DataType> decimal_type;
std::tie(filename, decimal_type) = GetParam();
path += "/" + filename;
diff --git a/src/parquet/arrow/arrow-schema-test.cc
b/src/parquet/arrow/arrow-schema-test.cc
index 771b9960..b33eda14 100644
--- a/src/parquet/arrow/arrow-schema-test.cc
+++ b/src/parquet/arrow/arrow-schema-test.cc
@@ -62,8 +62,8 @@ class TestConvertParquetSchema : public ::testing::Test {
for (int i = 0; i < expected_schema->num_fields(); ++i) {
auto lhs = result_schema_->field(i);
auto rhs = expected_schema->field(i);
- EXPECT_TRUE(lhs->Equals(rhs))
- << i << " " << lhs->ToString() << " != " << rhs->ToString();
+ EXPECT_TRUE(lhs->Equals(rhs)) << i << " " << lhs->ToString()
+ << " != " << rhs->ToString();
}
}
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index 53065a68..93183051 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -717,8 +717,7 @@ struct supports_fast_path_impl<ArrowType, FLBAType> {
template <typename ArrowType, typename ParquetType>
using supports_fast_path =
- typename std::enable_if<supports_fast_path_impl<ArrowType,
ParquetType>::value,
- ParquetType>::type;
+ typename std::enable_if<supports_fast_path_impl<ArrowType,
ParquetType>::value>::type;
template <typename ArrowType, typename ParquetType, typename Enable = void>
struct TransferFunctor {
@@ -728,6 +727,10 @@ struct TransferFunctor {
Status operator()(RecordReader* reader, MemoryPool* pool,
const std::shared_ptr<::arrow::DataType>& type,
std::shared_ptr<Array>* out) {
+ static_assert(!std::is_same<ArrowType, ::arrow::Int32Type>::value,
+ "The fast path transfer functor should be used "
+ "for primitive values");
+
int64_t length = reader->values_written();
std::shared_ptr<Buffer> data;
RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(ArrowCType),
&data));
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index d7001aab..85d5bd3f 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -31,6 +31,7 @@
using arrow::Array;
using arrow::BinaryArray;
+using arrow::ChunkedArray;
using arrow::FixedSizeBinaryArray;
using arrow::Decimal128Array;
using arrow::BooleanArray;
@@ -65,12 +66,12 @@ std::shared_ptr<ArrowWriterProperties>
default_arrow_writer_properties() {
return default_writer_properties;
}
+namespace {
+
class LevelBuilder {
public:
explicit LevelBuilder(MemoryPool* pool)
- : def_levels_(::arrow::int16(), pool), rep_levels_(::arrow::int16(),
pool) {
- def_levels_buffer_ = std::make_shared<PoolBuffer>(pool);
- }
+ : def_levels_(::arrow::int16(), pool), rep_levels_(::arrow::int16(),
pool) {}
Status VisitInline(const Array& array);
@@ -80,7 +81,6 @@ class LevelBuilder {
array_offsets_.push_back(static_cast<int32_t>(array.offset()));
valid_bitmaps_.push_back(array.null_bitmap_data());
null_counts_.push_back(array.null_count());
- values_type_ = array.type_id();
values_array_ = std::make_shared<T>(array.data());
return Status::OK();
}
@@ -106,13 +106,12 @@ class LevelBuilder {
NOT_IMPLEMENTED_VISIT(Struct)
NOT_IMPLEMENTED_VISIT(Union)
NOT_IMPLEMENTED_VISIT(Dictionary)
- NOT_IMPLEMENTED_VISIT(Interval)
Status GenerateLevels(const Array& array, const std::shared_ptr<Field>&
field,
- int64_t* values_offset, ::arrow::Type::type*
values_type,
- int64_t* num_values, int64_t* num_levels,
- std::shared_ptr<Buffer>* def_levels,
- std::shared_ptr<Buffer>* rep_levels,
+ int64_t* values_offset, int64_t* num_values, int64_t*
num_levels,
+ const std::shared_ptr<PoolBuffer>& def_levels_scratch,
+ std::shared_ptr<Buffer>* def_levels_out,
+ std::shared_ptr<Buffer>* rep_levels_out,
std::shared_ptr<Array>* values_array) {
// Work downwards to extract bitmaps and offsets
min_offset_idx_ = 0;
@@ -120,7 +119,6 @@ class LevelBuilder {
RETURN_NOT_OK(VisitInline(array));
*num_values = max_offset_idx_ - min_offset_idx_;
*values_offset = min_offset_idx_;
- *values_type = values_type_;
*values_array = values_array_;
// Walk downwards to extract nullability
@@ -139,11 +137,12 @@ class LevelBuilder {
// Generate the levels.
if (nullable_.size() == 1) {
// We have a PrimitiveArray
- *rep_levels = nullptr;
+ *rep_levels_out = nullptr;
if (nullable_[0]) {
- RETURN_NOT_OK(def_levels_buffer_->Resize(array.length() *
sizeof(int16_t)));
+ RETURN_NOT_OK(
+ def_levels_scratch->Resize(array.length() * sizeof(int16_t),
false));
auto def_levels_ptr =
- reinterpret_cast<int16_t*>(def_levels_buffer_->mutable_data());
+ reinterpret_cast<int16_t*>(def_levels_scratch->mutable_data());
if (array.null_count() == 0) {
std::fill(def_levels_ptr, def_levels_ptr + array.length(), 1);
} else if (array.null_count() == array.length()) {
@@ -152,17 +151,14 @@ class LevelBuilder {
::arrow::internal::BitmapReader valid_bits_reader(
array.null_bitmap_data(), array.offset(), array.length());
for (int i = 0; i < array.length(); i++) {
- if (valid_bits_reader.IsSet()) {
- def_levels_ptr[i] = 1;
- } else {
- def_levels_ptr[i] = 0;
- }
+ def_levels_ptr[i] = valid_bits_reader.IsSet() ? 1 : 0;
valid_bits_reader.Next();
}
}
- *def_levels = def_levels_buffer_;
+
+ *def_levels_out = def_levels_scratch;
} else {
- *def_levels = nullptr;
+ *def_levels_out = nullptr;
}
*num_levels = array.length();
} else {
@@ -170,12 +166,13 @@ class LevelBuilder {
RETURN_NOT_OK(HandleListEntries(0, 0, 0, array.length()));
std::shared_ptr<Array> def_levels_array;
- RETURN_NOT_OK(def_levels_.Finish(&def_levels_array));
- *def_levels =
static_cast<PrimitiveArray*>(def_levels_array.get())->values();
-
std::shared_ptr<Array> rep_levels_array;
+
+ RETURN_NOT_OK(def_levels_.Finish(&def_levels_array));
RETURN_NOT_OK(rep_levels_.Finish(&rep_levels_array));
- *rep_levels =
static_cast<PrimitiveArray*>(rep_levels_array.get())->values();
+
+ *def_levels_out =
static_cast<PrimitiveArray*>(def_levels_array.get())->values();
+ *rep_levels_out =
static_cast<PrimitiveArray*>(rep_levels_array.get())->values();
*num_levels = rep_levels_array->length();
}
@@ -242,7 +239,6 @@ class LevelBuilder {
private:
Int16Builder def_levels_;
- std::shared_ptr<PoolBuffer> def_levels_buffer_;
Int16Builder rep_levels_;
std::vector<int64_t> null_counts_;
@@ -253,7 +249,6 @@ class LevelBuilder {
int64_t min_offset_idx_;
int64_t max_offset_idx_;
- ::arrow::Type::type values_type_;
std::shared_ptr<Array> values_array_;
};
@@ -261,164 +256,221 @@ Status LevelBuilder::VisitInline(const Array& array) {
return VisitArrayInline(array, this);
}
-class FileWriter::Impl {
+struct ColumnWriterContext {
+ ColumnWriterContext(MemoryPool* memory_pool, ArrowWriterProperties*
properties)
+ : memory_pool(memory_pool), properties(properties) {
+ this->data_buffer = std::make_shared<PoolBuffer>(memory_pool);
+ this->def_levels_buffer = std::make_shared<PoolBuffer>(memory_pool);
+ }
+
+ template <typename T>
+ Status GetScratchData(const int64_t num_values, T** out) {
+ RETURN_NOT_OK(this->data_buffer->Resize(num_values * sizeof(T), false));
+ *out = reinterpret_cast<T*>(this->data_buffer->mutable_data());
+ return Status::OK();
+ }
+
+ MemoryPool* memory_pool;
+ ArrowWriterProperties* properties;
+
+ // Buffer used for storing the data of an array converted to the physical
type
+ // as expected by parquet-cpp.
+ std::shared_ptr<PoolBuffer> data_buffer;
+
+ // We use the shared ownership of this buffer
+ std::shared_ptr<PoolBuffer> def_levels_buffer;
+};
+
+Status GetLeafType(const ::arrow::DataType& type, ::arrow::Type::type*
leaf_type) {
+ if (type.id() == ::arrow::Type::LIST || type.id() == ::arrow::Type::STRUCT) {
+ if (type.num_children() != 1) {
+ return Status::Invalid("Nested column branch had multiple children");
+ }
+ return GetLeafType(*type.child(0)->type(), leaf_type);
+ } else {
+ *leaf_type = type.id();
+ return Status::OK();
+ }
+}
+
+class ArrowColumnWriter {
public:
- Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
- const std::shared_ptr<ArrowWriterProperties>& arrow_properties);
+ ArrowColumnWriter(ColumnWriterContext* ctx, ColumnWriter* column_writer,
+ const std::shared_ptr<Field>& field)
+ : ctx_(ctx), writer_(column_writer), field_(field) {}
+
+ Status Write(const Array& data);
+
+ Status Write(const ChunkedArray& data, int64_t offset, const int64_t size) {
+ int64_t absolute_position = 0;
+ int chunk_index = 0;
+ int64_t chunk_offset = 0;
+ while (chunk_index < data.num_chunks() && absolute_position < offset) {
+ const int64_t chunk_length = data.chunk(chunk_index)->length();
+ if (absolute_position + chunk_length > offset) {
+ // Relative offset into the chunk to reach the desired start offset for
+ // writing
+ chunk_offset = offset - absolute_position;
+ break;
+ } else {
+ ++chunk_index;
+ absolute_position += chunk_length;
+ }
+ }
+
+ if (absolute_position >= data.length()) {
+ return Status::Invalid("Cannot write data at offset past end of chunked
array");
+ }
+
+ int64_t values_written = 0;
+ while (values_written < size) {
+ const Array& chunk = *data.chunk(chunk_index);
+ const int64_t available_values = chunk.length() - chunk_offset;
+ const int64_t chunk_write_size = std::min(size - values_written,
available_values);
+
+ // The chunk offset here will be 0 except for possibly the first chunk
+ // because of the advancing logic above
+ std::shared_ptr<Array> array_to_write = chunk.Slice(chunk_offset,
chunk_write_size);
+ RETURN_NOT_OK(Write(*array_to_write));
+
+ if (chunk_write_size == available_values) {
+ chunk_offset = 0;
+ ++chunk_index;
+ }
+ values_written += chunk_write_size;
+ }
+
+ return Status::OK();
+ }
- Status NewRowGroup(int64_t chunk_size);
+ Status Close() {
+ PARQUET_CATCH_NOT_OK(writer_->Close());
+ return Status::OK();
+ }
+ private:
template <typename ParquetType, typename ArrowType>
- Status TypedWriteBatch(ColumnWriter* column_writer, const
std::shared_ptr<Array>& data,
- int64_t num_levels, const int16_t* def_levels,
+ Status TypedWriteBatch(const Array& data, int64_t num_levels, const int16_t*
def_levels,
const int16_t* rep_levels);
- Status WriteTimestamps(ColumnWriter* column_writer, const
std::shared_ptr<Array>& data,
- int64_t num_levels, const int16_t* def_levels,
+ Status WriteTimestamps(const Array& data, int64_t num_levels, const int16_t*
def_levels,
const int16_t* rep_levels);
- Status WriteTimestampsCoerce(ColumnWriter* column_writer,
- const std::shared_ptr<Array>& data, int64_t
num_levels,
+ Status WriteTimestampsCoerce(const Array& data, int64_t num_levels,
const int16_t* def_levels, const int16_t*
rep_levels);
template <typename ParquetType, typename ArrowType>
- Status WriteNonNullableBatch(TypedColumnWriter<ParquetType>* column_writer,
- const ArrowType& type, int64_t num_values,
+ Status WriteNonNullableBatch(const ArrowType& type, int64_t num_values,
int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels,
- const typename ArrowType::c_type* data_ptr);
+ const typename ArrowType::c_type* values);
template <typename ParquetType, typename ArrowType>
- Status WriteNullableBatch(TypedColumnWriter<ParquetType>* column_writer,
- const ArrowType& type, int64_t num_values, int64_t
num_levels,
+ Status WriteNullableBatch(const ArrowType& type, int64_t num_values, int64_t
num_levels,
const int16_t* def_levels, const int16_t*
rep_levels,
const uint8_t* valid_bits, int64_t
valid_bits_offset,
- const typename ArrowType::c_type* data_ptr);
-
- Status WriteColumnChunk(const Array& data);
- Status Close();
-
- const WriterProperties& properties() const { return *writer_->properties(); }
-
- virtual ~Impl() {}
-
- private:
- friend class FileWriter;
-
- MemoryPool* pool_;
- // Buffer used for storing the data of an array converted to the physical
type
- // as expected by parquet-cpp.
- PoolBuffer data_buffer_;
- std::unique_ptr<ParquetFileWriter> writer_;
- RowGroupWriter* row_group_writer_;
- std::shared_ptr<ArrowWriterProperties> arrow_properties_;
- bool closed_;
-};
+ const typename ArrowType::c_type* values);
+
+ template <typename ParquetType>
+ Status WriteBatch(int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels,
+ const typename ParquetType::c_type* values) {
+ auto typed_writer = static_cast<TypedColumnWriter<ParquetType>*>(writer_);
+ PARQUET_CATCH_NOT_OK(
+ typed_writer->WriteBatch(num_levels, def_levels, rep_levels, values));
+ return Status::OK();
+ }
-FileWriter::Impl::Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter>
writer,
- const std::shared_ptr<ArrowWriterProperties>&
arrow_properties)
- : pool_(pool),
- data_buffer_(pool),
- writer_(std::move(writer)),
- row_group_writer_(nullptr),
- arrow_properties_(arrow_properties),
- closed_(false) {}
-
-Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) {
- if (row_group_writer_ != nullptr) {
- PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
+ template <typename ParquetType>
+ Status WriteBatchSpaced(int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels, const uint8_t* valid_bits,
+ int64_t valid_bits_offset,
+ const typename ParquetType::c_type* values) {
+ auto typed_writer = static_cast<TypedColumnWriter<ParquetType>*>(writer_);
+ PARQUET_CATCH_NOT_OK(typed_writer->WriteBatchSpaced(
+ num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset,
values));
+ return Status::OK();
}
- PARQUET_CATCH_NOT_OK(row_group_writer_ = writer_->AppendRowGroup());
- return Status::OK();
-}
-// ----------------------------------------------------------------------
-// Column type specialization
+ ColumnWriterContext* ctx_;
+ ColumnWriter* writer_;
+ std::shared_ptr<Field> field_;
+};
template <typename ParquetType, typename ArrowType>
-Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer,
- const std::shared_ptr<Array>& array,
- int64_t num_levels, const int16_t*
def_levels,
- const int16_t* rep_levels) {
+Status ArrowColumnWriter::TypedWriteBatch(const Array& array, int64_t
num_levels,
+ const int16_t* def_levels,
+ const int16_t* rep_levels) {
using ArrowCType = typename ArrowType::c_type;
- const auto& data = static_cast<const PrimitiveArray&>(*array);
- auto data_ptr =
+ const auto& data = static_cast<const PrimitiveArray&>(array);
+ auto values =
reinterpret_cast<const ArrowCType*>(data.values()->data()) +
data.offset();
- auto writer =
reinterpret_cast<TypedColumnWriter<ParquetType>*>(column_writer);
- if (writer->descr()->schema_node()->is_required() || (data.null_count() ==
0)) {
+ if (writer_->descr()->schema_node()->is_required() || (data.null_count() ==
0)) {
// no nulls, just dump the data
RETURN_NOT_OK((WriteNonNullableBatch<ParquetType, ArrowType>(
- writer, static_cast<const ArrowType&>(*array->type()), array->length(),
- num_levels, def_levels, rep_levels, data_ptr)));
+ static_cast<const ArrowType&>(*array.type()), array.length(),
num_levels,
+ def_levels, rep_levels, values)));
} else {
const uint8_t* valid_bits = data.null_bitmap_data();
RETURN_NOT_OK((WriteNullableBatch<ParquetType, ArrowType>(
- writer, static_cast<const ArrowType&>(*array->type()), data.length(),
num_levels,
- def_levels, rep_levels, valid_bits, data.offset(), data_ptr)));
+ static_cast<const ArrowType&>(*array.type()), data.length(),
num_levels,
+ def_levels, rep_levels, valid_bits, data.offset(), values)));
}
- PARQUET_CATCH_NOT_OK(writer->Close());
return Status::OK();
}
template <typename ParquetType, typename ArrowType>
-Status FileWriter::Impl::WriteNonNullableBatch(
- TypedColumnWriter<ParquetType>* writer, const ArrowType& type, int64_t
num_values,
- int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels,
- const typename ArrowType::c_type* data_ptr) {
+Status ArrowColumnWriter::WriteNonNullableBatch(
+ const ArrowType& type, int64_t num_values, int64_t num_levels,
+ const int16_t* def_levels, const int16_t* rep_levels,
+ const typename ArrowType::c_type* values) {
using ParquetCType = typename ParquetType::c_type;
- RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(ParquetCType)));
- auto buffer_ptr =
reinterpret_cast<ParquetCType*>(data_buffer_.mutable_data());
- std::copy(data_ptr, data_ptr + num_values, buffer_ptr);
- PARQUET_CATCH_NOT_OK(
- writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
- return Status::OK();
+ ParquetCType* buffer;
+ RETURN_NOT_OK(ctx_->GetScratchData<ParquetCType>(num_values, &buffer));
+
+ std::copy(values, values + num_values, buffer);
+
+ return WriteBatch<ParquetType>(num_levels, def_levels, rep_levels, buffer);
}
template <>
-Status FileWriter::Impl::WriteNonNullableBatch<Int32Type, ::arrow::Date64Type>(
- TypedColumnWriter<Int32Type>* writer, const ::arrow::Date64Type& type,
- int64_t num_values, int64_t num_levels, const int16_t* def_levels,
- const int16_t* rep_levels, const int64_t* data_ptr) {
- RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t)));
- auto buffer_ptr = reinterpret_cast<int32_t*>(data_buffer_.mutable_data());
+Status ArrowColumnWriter::WriteNonNullableBatch<Int32Type,
::arrow::Date64Type>(
+ const ::arrow::Date64Type& type, int64_t num_values, int64_t num_levels,
+ const int16_t* def_levels, const int16_t* rep_levels, const int64_t*
values) {
+ int32_t* buffer;
+ RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_levels, &buffer));
+
for (int i = 0; i < num_values; i++) {
- buffer_ptr[i] = static_cast<int32_t>(data_ptr[i] / 86400000);
+ buffer[i] = static_cast<int32_t>(values[i] / 86400000);
}
- PARQUET_CATCH_NOT_OK(
- writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
- return Status::OK();
+
+ return WriteBatch<Int32Type>(num_levels, def_levels, rep_levels, buffer);
}
template <>
-Status FileWriter::Impl::WriteNonNullableBatch<Int32Type, ::arrow::Time32Type>(
- TypedColumnWriter<Int32Type>* writer, const ::arrow::Time32Type& type,
- int64_t num_values, int64_t num_levels, const int16_t* def_levels,
- const int16_t* rep_levels, const int32_t* data_ptr) {
- RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t)));
- auto buffer_ptr = reinterpret_cast<int32_t*>(data_buffer_.mutable_data());
+Status ArrowColumnWriter::WriteNonNullableBatch<Int32Type,
::arrow::Time32Type>(
+ const ::arrow::Time32Type& type, int64_t num_values, int64_t num_levels,
+ const int16_t* def_levels, const int16_t* rep_levels, const int32_t*
values) {
+ int32_t* buffer;
+ RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_levels, &buffer));
if (type.unit() == TimeUnit::SECOND) {
for (int i = 0; i < num_values; i++) {
- buffer_ptr[i] = data_ptr[i] * 1000;
+ buffer[i] = values[i] * 1000;
}
} else {
- std::copy(data_ptr, data_ptr + num_values, buffer_ptr);
+ std::copy(values, values + num_values, buffer);
}
- PARQUET_CATCH_NOT_OK(
- writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
- return Status::OK();
+ return WriteBatch<Int32Type>(num_levels, def_levels, rep_levels, buffer);
}
-#define NONNULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \
- template <> \
- Status FileWriter::Impl::WriteNonNullableBatch<ParquetType, ArrowType>( \
- TypedColumnWriter<ParquetType> * writer, const ArrowType& type, \
- int64_t num_values, int64_t num_levels, const int16_t* def_levels, \
- const int16_t* rep_levels, const CType* data_ptr) { \
- PARQUET_CATCH_NOT_OK( \
- writer->WriteBatch(num_levels, def_levels, rep_levels, data_ptr)); \
- return Status::OK(); \
+#define NONNULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType)
\
+ template <>
\
+ Status ArrowColumnWriter::WriteNonNullableBatch<ParquetType, ArrowType>(
\
+ const ArrowType& type, int64_t num_values, int64_t num_levels,
\
+ const int16_t* def_levels, const int16_t* rep_levels, const CType*
buffer) { \
+ return WriteBatch<ParquetType>(num_levels, def_levels, rep_levels,
buffer); \
}
NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t)
@@ -429,96 +481,68 @@ NONNULLABLE_BATCH_FAST_PATH(FloatType,
::arrow::FloatType, float)
NONNULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
template <typename ParquetType, typename ArrowType>
-Status FileWriter::Impl::WriteNullableBatch(TypedColumnWriter<ParquetType>*
writer,
- const ArrowType& type, int64_t
num_values,
- int64_t num_levels, const int16_t*
def_levels,
- const int16_t* rep_levels,
- const uint8_t* valid_bits,
- int64_t valid_bits_offset,
- const typename ArrowType::c_type*
data_ptr) {
+Status ArrowColumnWriter::WriteNullableBatch(
+ const ArrowType& type, int64_t num_values, int64_t num_levels,
+ const int16_t* def_levels, const int16_t* rep_levels, const uint8_t*
valid_bits,
+ int64_t valid_bits_offset, const typename ArrowType::c_type* values) {
using ParquetCType = typename ParquetType::c_type;
- RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(ParquetCType)));
- auto buffer_ptr =
reinterpret_cast<ParquetCType*>(data_buffer_.mutable_data());
- ::arrow::internal::BitmapReader valid_bits_reader(valid_bits,
valid_bits_offset,
- num_values);
+ ParquetCType* buffer;
+ RETURN_NOT_OK(ctx_->GetScratchData<ParquetCType>(num_levels, &buffer));
for (int i = 0; i < num_values; i++) {
- if (valid_bits_reader.IsSet()) {
- buffer_ptr[i] = static_cast<ParquetCType>(data_ptr[i]);
- }
- valid_bits_reader.Next();
+ buffer[i] = static_cast<ParquetCType>(values[i]);
}
- PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(
- num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset,
buffer_ptr));
- return Status::OK();
+ return WriteBatchSpaced<ParquetType>(num_levels, def_levels, rep_levels,
valid_bits,
+ valid_bits_offset, buffer);
}
template <>
-Status FileWriter::Impl::WriteNullableBatch<Int32Type, ::arrow::Date64Type>(
- TypedColumnWriter<Int32Type>* writer, const ::arrow::Date64Type& type,
- int64_t num_values, int64_t num_levels, const int16_t* def_levels,
- const int16_t* rep_levels, const uint8_t* valid_bits, int64_t
valid_bits_offset,
- const int64_t* data_ptr) {
- RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t)));
- auto buffer_ptr = reinterpret_cast<int32_t*>(data_buffer_.mutable_data());
- ::arrow::internal::BitmapReader valid_bits_reader(valid_bits,
valid_bits_offset,
- num_values);
+Status ArrowColumnWriter::WriteNullableBatch<Int32Type, ::arrow::Date64Type>(
+ const ::arrow::Date64Type& type, int64_t num_values, int64_t num_levels,
+ const int16_t* def_levels, const int16_t* rep_levels, const uint8_t*
valid_bits,
+ int64_t valid_bits_offset, const int64_t* values) {
+ int32_t* buffer;
+ RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_values, &buffer));
+
for (int i = 0; i < num_values; i++) {
- if (valid_bits_reader.IsSet()) {
- // Convert from milliseconds into days since the epoch
- buffer_ptr[i] = static_cast<int32_t>(data_ptr[i] / 86400000);
- }
- valid_bits_reader.Next();
+ // Convert from milliseconds into days since the epoch
+ buffer[i] = static_cast<int32_t>(values[i] / 86400000);
}
- PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(
- num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset,
buffer_ptr));
- return Status::OK();
+ return WriteBatchSpaced<Int32Type>(num_levels, def_levels, rep_levels,
valid_bits,
+ valid_bits_offset, buffer);
}
template <>
-Status FileWriter::Impl::WriteNullableBatch<Int32Type, ::arrow::Time32Type>(
- TypedColumnWriter<Int32Type>* writer, const ::arrow::Time32Type& type,
- int64_t num_values, int64_t num_levels, const int16_t* def_levels,
- const int16_t* rep_levels, const uint8_t* valid_bits, int64_t
valid_bits_offset,
- const int32_t* data_ptr) {
- RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t)));
- auto buffer_ptr = reinterpret_cast<int32_t*>(data_buffer_.mutable_data());
- ::arrow::internal::BitmapReader valid_bits_reader(valid_bits,
valid_bits_offset,
- num_values);
+Status ArrowColumnWriter::WriteNullableBatch<Int32Type, ::arrow::Time32Type>(
+ const ::arrow::Time32Type& type, int64_t num_values, int64_t num_levels,
+ const int16_t* def_levels, const int16_t* rep_levels, const uint8_t*
valid_bits,
+ int64_t valid_bits_offset, const int32_t* values) {
+ int32_t* buffer;
+ RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_values, &buffer));
if (type.unit() == TimeUnit::SECOND) {
for (int i = 0; i < num_values; i++) {
- if (valid_bits_reader.IsSet()) {
- buffer_ptr[i] = data_ptr[i] * 1000;
- }
- valid_bits_reader.Next();
+ buffer[i] = values[i] * 1000;
}
} else {
for (int i = 0; i < num_values; i++) {
- if (valid_bits_reader.IsSet()) {
- buffer_ptr[i] = data_ptr[i];
- }
- valid_bits_reader.Next();
+ buffer[i] = values[i];
}
}
- PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(
- num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset,
buffer_ptr));
-
- return Status::OK();
+ return WriteBatchSpaced<Int32Type>(num_levels, def_levels, rep_levels,
valid_bits,
+ valid_bits_offset, buffer);
}
-#define NULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType)
\
- template <>
\
- Status FileWriter::Impl::WriteNullableBatch<ParquetType, ArrowType>(
\
- TypedColumnWriter<ParquetType> * writer, const ArrowType& type,
\
- int64_t num_values, int64_t num_levels, const int16_t* def_levels,
\
- const int16_t* rep_levels, const uint8_t* valid_bits, int64_t
valid_bits_offset, \
- const CType* data_ptr) {
\
- PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(
\
- num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset,
data_ptr)); \
- return Status::OK();
\
+#define NULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType)
\
+ template <>
\
+ Status ArrowColumnWriter::WriteNullableBatch<ParquetType, ArrowType>(
\
+ const ArrowType& type, int64_t num_values, int64_t num_levels,
\
+ const int16_t* def_levels, const int16_t* rep_levels, const uint8_t*
valid_bits, \
+ int64_t valid_bits_offset, const CType* values) {
\
+ return WriteBatchSpaced<ParquetType>(num_levels, def_levels, rep_levels,
valid_bits, \
+ valid_bits_offset, values);
\
}
NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t)
@@ -527,122 +551,99 @@ NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type,
int64_t)
NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t)
NULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float)
NULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
-
-// ----------------------------------------------------------------------
-// Write timestamps
-
NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t)
NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t)
template <>
-Status FileWriter::Impl::WriteNullableBatch<Int96Type, ::arrow::TimestampType>(
- TypedColumnWriter<Int96Type>* writer, const ::arrow::TimestampType& type,
- int64_t num_values, int64_t num_levels, const int16_t* def_levels,
- const int16_t* rep_levels, const uint8_t* valid_bits, int64_t
valid_bits_offset,
- const int64_t* data_ptr) {
- RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(Int96)));
- auto buffer_ptr = reinterpret_cast<Int96*>(data_buffer_.mutable_data());
- ::arrow::internal::BitmapReader valid_bits_reader(valid_bits,
valid_bits_offset,
- num_values);
+Status ArrowColumnWriter::WriteNullableBatch<Int96Type,
::arrow::TimestampType>(
+ const ::arrow::TimestampType& type, int64_t num_values, int64_t num_levels,
+ const int16_t* def_levels, const int16_t* rep_levels, const uint8_t*
valid_bits,
+ int64_t valid_bits_offset, const int64_t* values) {
+ Int96* buffer;
+ RETURN_NOT_OK(ctx_->GetScratchData<Int96>(num_values, &buffer));
if (type.unit() == TimeUnit::NANO) {
for (int i = 0; i < num_values; i++) {
- if (valid_bits_reader.IsSet()) {
- internal::NanosecondsToImpalaTimestamp(data_ptr[i], buffer_ptr + i);
- }
- valid_bits_reader.Next();
+ internal::NanosecondsToImpalaTimestamp(values[i], &buffer[i]);
}
} else {
return Status::NotImplemented("Only NANO timestamps are supported for
Int96 writing");
}
- PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(
- num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset,
buffer_ptr));
-
- return Status::OK();
+ return WriteBatchSpaced<Int96Type>(num_levels, def_levels, rep_levels,
valid_bits,
+ valid_bits_offset, buffer);
}
template <>
-Status FileWriter::Impl::WriteNonNullableBatch<Int96Type,
::arrow::TimestampType>(
- TypedColumnWriter<Int96Type>* writer, const ::arrow::TimestampType& type,
- int64_t num_values, int64_t num_levels, const int16_t* def_levels,
- const int16_t* rep_levels, const int64_t* data_ptr) {
- RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(Int96)));
- auto buffer_ptr = reinterpret_cast<Int96*>(data_buffer_.mutable_data());
+Status ArrowColumnWriter::WriteNonNullableBatch<Int96Type,
::arrow::TimestampType>(
+ const ::arrow::TimestampType& type, int64_t num_values, int64_t num_levels,
+ const int16_t* def_levels, const int16_t* rep_levels, const int64_t*
values) {
+ Int96* buffer;
+ RETURN_NOT_OK(ctx_->GetScratchData<Int96>(num_values, &buffer));
if (type.unit() == TimeUnit::NANO) {
for (int i = 0; i < num_values; i++) {
- internal::NanosecondsToImpalaTimestamp(data_ptr[i], buffer_ptr + i);
+ internal::NanosecondsToImpalaTimestamp(values[i], buffer + i);
}
} else {
return Status::NotImplemented("Only NANO timestamps are supported for
Int96 writing");
}
- PARQUET_CATCH_NOT_OK(
- writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
- return Status::OK();
+ return WriteBatch<Int96Type>(num_levels, def_levels, rep_levels, buffer);
}
-Status FileWriter::Impl::WriteTimestamps(ColumnWriter* column_writer,
- const std::shared_ptr<Array>& values,
- int64_t num_levels, const int16_t*
def_levels,
- const int16_t* rep_levels) {
- const auto& type = static_cast<::arrow::TimestampType&>(*values->type());
+Status ArrowColumnWriter::WriteTimestamps(const Array& values, int64_t
num_levels,
+ const int16_t* def_levels,
+ const int16_t* rep_levels) {
+ const auto& type = static_cast<const
::arrow::TimestampType&>(*values.type());
const bool is_nanosecond = type.unit() == TimeUnit::NANO;
- if (is_nanosecond &&
arrow_properties_->support_deprecated_int96_timestamps()) {
- return TypedWriteBatch<Int96Type, ::arrow::TimestampType>(
- column_writer, values, num_levels, def_levels, rep_levels);
+ if (is_nanosecond &&
ctx_->properties->support_deprecated_int96_timestamps()) {
+ return TypedWriteBatch<Int96Type, ::arrow::TimestampType>(values,
num_levels,
+ def_levels,
rep_levels);
} else if (is_nanosecond ||
- (arrow_properties_->coerce_timestamps_enabled() &&
- (type.unit() != arrow_properties_->coerce_timestamps_unit()))) {
+ (ctx_->properties->coerce_timestamps_enabled() &&
+ (type.unit() != ctx_->properties->coerce_timestamps_unit()))) {
// Casting is required. This covers several cases
// * Nanoseconds -> cast to microseconds
// * coerce_timestamps_enabled_, cast all timestamps to requested unit
- return WriteTimestampsCoerce(column_writer, values, num_levels, def_levels,
- rep_levels);
+ return WriteTimestampsCoerce(values, num_levels, def_levels, rep_levels);
} else {
// No casting of timestamps is required, take the fast path
- return TypedWriteBatch<Int64Type, ::arrow::TimestampType>(
- column_writer, values, num_levels, def_levels, rep_levels);
+ return TypedWriteBatch<Int64Type, ::arrow::TimestampType>(values,
num_levels,
+ def_levels,
rep_levels);
}
}
-Status FileWriter::Impl::WriteTimestampsCoerce(ColumnWriter* column_writer,
- const std::shared_ptr<Array>&
array,
- int64_t num_levels,
- const int16_t* def_levels,
- const int16_t* rep_levels) {
- // Note that we can only use data_buffer_ here as we write timestamps with
the fast
- // path.
- RETURN_NOT_OK(data_buffer_.Resize(array->length() * sizeof(int64_t)));
- int64_t* data_buffer_ptr =
reinterpret_cast<int64_t*>(data_buffer_.mutable_data());
-
- const auto& data = static_cast<const ::arrow::TimestampArray&>(*array);
+Status ArrowColumnWriter::WriteTimestampsCoerce(const Array& array, int64_t
num_levels,
+ const int16_t* def_levels,
+ const int16_t* rep_levels) {
+ int64_t* buffer;
+ RETURN_NOT_OK(ctx_->GetScratchData<int64_t>(num_levels, &buffer));
- auto data_ptr = data.raw_values();
- auto writer = reinterpret_cast<TypedColumnWriter<Int64Type>*>(column_writer);
+ const auto& data = static_cast<const ::arrow::TimestampArray&>(array);
- const auto& type = static_cast<const
::arrow::TimestampType&>(*array->type());
+ auto values = data.raw_values();
+ const auto& type = static_cast<const ::arrow::TimestampType&>(*array.type());
- TimeUnit::type target_unit = arrow_properties_->coerce_timestamps_enabled()
- ?
arrow_properties_->coerce_timestamps_unit()
+ TimeUnit::type target_unit = ctx_->properties->coerce_timestamps_enabled()
+ ? ctx_->properties->coerce_timestamps_unit()
: TimeUnit::MICRO;
auto target_type = ::arrow::timestamp(target_unit);
auto DivideBy = [&](const int64_t factor) {
- for (int64_t i = 0; i < array->length(); i++) {
- if (!data.IsNull(i) && (data_ptr[i] % factor != 0)) {
+ for (int64_t i = 0; i < array.length(); i++) {
+ if (!data.IsNull(i) && (values[i] % factor != 0)) {
std::stringstream ss;
ss << "Casting from " << type.ToString() << " to " <<
target_type->ToString()
- << " would lose data: " << data_ptr[i];
+ << " would lose data: " << values[i];
return Status::Invalid(ss.str());
}
- data_buffer_ptr[i] = data_ptr[i] / factor;
+ buffer[i] = values[i] / factor;
}
return Status::OK();
};
auto MultiplyBy = [&](const int64_t factor) {
- for (int64_t i = 0; i < array->length(); i++) {
- data_buffer_ptr[i] = data_ptr[i] * factor;
+ for (int64_t i = 0; i < array.length(); i++) {
+ buffer[i] = values[i] * factor;
}
return Status::OK();
};
@@ -664,156 +665,140 @@ Status
FileWriter::Impl::WriteTimestampsCoerce(ColumnWriter* column_writer,
RETURN_NOT_OK(DivideBy(1000));
}
- if (writer->descr()->schema_node()->is_required() || (data.null_count() ==
0)) {
+ if (writer_->descr()->schema_node()->is_required() || (data.null_count() ==
0)) {
// no nulls, just dump the data
RETURN_NOT_OK((WriteNonNullableBatch<Int64Type, ::arrow::TimestampType>(
- writer, static_cast<const ::arrow::TimestampType&>(*target_type),
array->length(),
- num_levels, def_levels, rep_levels, data_buffer_ptr)));
+ static_cast<const ::arrow::TimestampType&>(*target_type),
array.length(),
+ num_levels, def_levels, rep_levels, buffer)));
} else {
const uint8_t* valid_bits = data.null_bitmap_data();
RETURN_NOT_OK((WriteNullableBatch<Int64Type, ::arrow::TimestampType>(
- writer, static_cast<const ::arrow::TimestampType&>(*target_type),
array->length(),
- num_levels, def_levels, rep_levels, valid_bits, data.offset(),
data_buffer_ptr)));
+ static_cast<const ::arrow::TimestampType&>(*target_type),
array.length(),
+ num_levels, def_levels, rep_levels, valid_bits, data.offset(),
buffer)));
}
- PARQUET_CATCH_NOT_OK(writer->Close());
return Status::OK();
}
-// ----------------------------------------------------------------------
-
// This specialization seems quite similar but it significantly differs in two
points:
// * offset is added at the most latest time to the pointer as we have
sub-byte access
// * Arrow data is stored bitwise thus we cannot use std::copy to transform
from
// ArrowType::c_type to ParquetType::c_type
template <>
-Status FileWriter::Impl::TypedWriteBatch<BooleanType, ::arrow::BooleanType>(
- ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t
num_levels,
- const int16_t* def_levels, const int16_t* rep_levels) {
- RETURN_NOT_OK(data_buffer_.Resize(array->length()));
- auto data = static_cast<const BooleanArray*>(array.get());
- auto data_ptr = reinterpret_cast<const uint8_t*>(data->values()->data());
- auto buffer_ptr = reinterpret_cast<bool*>(data_buffer_.mutable_data());
- auto writer =
reinterpret_cast<TypedColumnWriter<BooleanType>*>(column_writer);
+Status ArrowColumnWriter::TypedWriteBatch<BooleanType, ::arrow::BooleanType>(
+ const Array& array, int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels) {
+ bool* buffer;
+ RETURN_NOT_OK(ctx_->GetScratchData<bool>(array.length(), &buffer));
+
+ const auto& data = static_cast<const BooleanArray&>(array);
+ auto values = reinterpret_cast<const uint8_t*>(data.values()->data());
int buffer_idx = 0;
- int64_t offset = array->offset();
- for (int i = 0; i < data->length(); i++) {
- if (!data->IsNull(i)) {
- buffer_ptr[buffer_idx++] = BitUtil::GetBit(data_ptr, offset + i);
+ int64_t offset = array.offset();
+ for (int i = 0; i < data.length(); i++) {
+ if (!data.IsNull(i)) {
+ buffer[buffer_idx++] = BitUtil::GetBit(values, offset + i);
}
}
- PARQUET_CATCH_NOT_OK(
- writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
- PARQUET_CATCH_NOT_OK(writer->Close());
- return Status::OK();
+
+ return WriteBatch<BooleanType>(num_levels, def_levels, rep_levels, buffer);
}
template <>
-Status FileWriter::Impl::TypedWriteBatch<Int32Type, ::arrow::NullType>(
- ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t
num_levels,
- const int16_t* def_levels, const int16_t* rep_levels) {
- auto writer = reinterpret_cast<TypedColumnWriter<Int32Type>*>(column_writer);
-
- PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels,
nullptr));
- PARQUET_CATCH_NOT_OK(writer->Close());
- return Status::OK();
+Status ArrowColumnWriter::TypedWriteBatch<Int32Type, ::arrow::NullType>(
+ const Array& array, int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels) {
+ return WriteBatch<Int32Type>(num_levels, def_levels, rep_levels, nullptr);
}
template <>
-Status FileWriter::Impl::TypedWriteBatch<ByteArrayType, ::arrow::BinaryType>(
- ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t
num_levels,
- const int16_t* def_levels, const int16_t* rep_levels) {
- RETURN_NOT_OK(data_buffer_.Resize(array->length() * sizeof(ByteArray)));
- auto data = static_cast<const BinaryArray*>(array.get());
- auto buffer_ptr = reinterpret_cast<ByteArray*>(data_buffer_.mutable_data());
+Status ArrowColumnWriter::TypedWriteBatch<ByteArrayType, ::arrow::BinaryType>(
+ const Array& array, int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels) {
+ ByteArray* buffer;
+ RETURN_NOT_OK(ctx_->GetScratchData<ByteArray>(num_levels, &buffer));
+
+ const auto& data = static_cast<const BinaryArray&>(array);
+
// In the case of an array consisting of only empty strings or all null,
- // data->data() points already to a nullptr, thus data->data()->data() will
+ // data.data() points already to a nullptr, thus data.data()->data() will
// segfault.
- const uint8_t* data_ptr = nullptr;
- if (data->value_data()) {
- data_ptr = reinterpret_cast<const uint8_t*>(data->value_data()->data());
- DCHECK(data_ptr != nullptr);
+ const uint8_t* values = nullptr;
+ if (data.value_data()) {
+ values = reinterpret_cast<const uint8_t*>(data.value_data()->data());
+ DCHECK(values != nullptr);
}
- auto writer =
reinterpret_cast<TypedColumnWriter<ByteArrayType>*>(column_writer);
// Slice offset is accounted for in raw_value_offsets
- const int32_t* value_offset = data->raw_value_offsets();
+ const int32_t* value_offset = data.raw_value_offsets();
- if (writer->descr()->schema_node()->is_required() || (data->null_count() ==
0)) {
+ if (writer_->descr()->schema_node()->is_required() || (data.null_count() ==
0)) {
// no nulls, just dump the data
- for (int64_t i = 0; i < data->length(); i++) {
- buffer_ptr[i] =
- ByteArray(value_offset[i + 1] - value_offset[i], data_ptr +
value_offset[i]);
+ for (int64_t i = 0; i < data.length(); i++) {
+ buffer[i] =
+ ByteArray(value_offset[i + 1] - value_offset[i], values +
value_offset[i]);
}
} else {
int buffer_idx = 0;
- for (int64_t i = 0; i < data->length(); i++) {
- if (!data->IsNull(i)) {
- buffer_ptr[buffer_idx++] =
- ByteArray(value_offset[i + 1] - value_offset[i], data_ptr +
value_offset[i]);
+ for (int64_t i = 0; i < data.length(); i++) {
+ if (!data.IsNull(i)) {
+ buffer[buffer_idx++] =
+ ByteArray(value_offset[i + 1] - value_offset[i], values +
value_offset[i]);
}
}
}
- PARQUET_CATCH_NOT_OK(
- writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
- PARQUET_CATCH_NOT_OK(writer->Close());
- return Status::OK();
+
+ return WriteBatch<ByteArrayType>(num_levels, def_levels, rep_levels, buffer);
}
template <>
-Status FileWriter::Impl::TypedWriteBatch<FLBAType,
::arrow::FixedSizeBinaryType>(
- ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t
num_levels,
- const int16_t* def_levels, const int16_t* rep_levels) {
- RETURN_NOT_OK(data_buffer_.Resize(array->length() * sizeof(FLBA), false));
- const auto& data = static_cast<const FixedSizeBinaryArray&>(*array);
+Status ArrowColumnWriter::TypedWriteBatch<FLBAType,
::arrow::FixedSizeBinaryType>(
+ const Array& array, int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels) {
+ const auto& data = static_cast<const FixedSizeBinaryArray&>(array);
const int64_t length = data.length();
- auto buffer_ptr = reinterpret_cast<FLBA*>(data_buffer_.mutable_data());
+ FLBA* buffer;
+ RETURN_NOT_OK(ctx_->GetScratchData<FLBA>(num_levels, &buffer));
- auto writer = reinterpret_cast<TypedColumnWriter<FLBAType>*>(column_writer);
-
- if (writer->descr()->schema_node()->is_required() || data.null_count() == 0)
{
+ if (writer_->descr()->schema_node()->is_required() || data.null_count() ==
0) {
// no nulls, just dump the data
// todo(advancedxy): use a writeBatch to avoid this step
for (int64_t i = 0; i < length; i++) {
- buffer_ptr[i] = FixedLenByteArray(data.GetValue(i));
+ buffer[i] = FixedLenByteArray(data.GetValue(i));
}
} else {
int buffer_idx = 0;
for (int64_t i = 0; i < length; i++) {
if (!data.IsNull(i)) {
- buffer_ptr[buffer_idx++] = FixedLenByteArray(data.GetValue(i));
+ buffer[buffer_idx++] = FixedLenByteArray(data.GetValue(i));
}
}
}
- PARQUET_CATCH_NOT_OK(
- writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
- PARQUET_CATCH_NOT_OK(writer->Close());
- return Status::OK();
+
+ return WriteBatch<FLBAType>(num_levels, def_levels, rep_levels, buffer);
}
template <>
-Status FileWriter::Impl::TypedWriteBatch<FLBAType, ::arrow::Decimal128Type>(
- ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t
num_levels,
- const int16_t* def_levels, const int16_t* rep_levels) {
- const auto& data = static_cast<const Decimal128Array&>(*array);
+Status ArrowColumnWriter::TypedWriteBatch<FLBAType, ::arrow::Decimal128Type>(
+ const Array& array, int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels) {
+ const auto& data = static_cast<const Decimal128Array&>(array);
const int64_t length = data.length();
- // TODO(phillipc): This is potentially very wasteful if we have a lot of
nulls
- std::vector<uint64_t> big_endian_values(static_cast<size_t>(length) * 2);
-
- RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(FLBA), false));
- auto buffer_ptr = reinterpret_cast<FLBA*>(data_buffer_.mutable_data());
-
- auto writer = reinterpret_cast<TypedColumnWriter<FLBAType>*>(column_writer);
+ FLBA* buffer;
+ RETURN_NOT_OK(ctx_->GetScratchData<FLBA>(num_levels, &buffer));
const auto& decimal_type = static_cast<const
::arrow::Decimal128Type&>(*data.type());
const int32_t offset =
decimal_type.byte_width() - DecimalSize(decimal_type.precision());
const bool does_not_have_nulls =
- writer->descr()->schema_node()->is_required() || data.null_count() == 0;
+ writer_->descr()->schema_node()->is_required() || data.null_count() == 0;
+
+ // TODO(phillipc): This is potentially very wasteful if we have a lot of
nulls
+ std::vector<uint64_t> big_endian_values(static_cast<size_t>(length) * 2);
// TODO(phillipc): Look into whether our compilers will perform loop
unswitching so we
// don't have to keep writing two loops to handle the case where we know
there are no
@@ -825,7 +810,7 @@ Status FileWriter::Impl::TypedWriteBatch<FLBAType,
::arrow::Decimal128Type>(
auto unsigned_64_bit = reinterpret_cast<const
uint64_t*>(data.GetValue(i));
big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]);
big_endian_values[j + 1] =
::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]);
- buffer_ptr[i] = FixedLenByteArray(
+ buffer[i] = FixedLenByteArray(
reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset);
}
} else {
@@ -834,77 +819,30 @@ Status FileWriter::Impl::TypedWriteBatch<FLBAType,
::arrow::Decimal128Type>(
auto unsigned_64_bit = reinterpret_cast<const
uint64_t*>(data.GetValue(i));
big_endian_values[j] =
::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]);
big_endian_values[j + 1] =
::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]);
- buffer_ptr[buffer_idx++] = FixedLenByteArray(
+ buffer[buffer_idx++] = FixedLenByteArray(
reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset);
j += 2;
}
}
}
- PARQUET_CATCH_NOT_OK(
- writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
- PARQUET_CATCH_NOT_OK(writer->Close());
- return Status::OK();
-}
-
-// End of column type specializations
-// ----------------------------------------------------------------------
-
-Status FileWriter::Impl::Close() {
- if (!closed_) {
- // Make idempotent
- closed_ = true;
- if (row_group_writer_ != nullptr) {
- PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
- }
- PARQUET_CATCH_NOT_OK(writer_->Close());
- }
- return Status::OK();
-}
-Status FileWriter::NewRowGroup(int64_t chunk_size) {
- return impl_->NewRowGroup(chunk_size);
+ return WriteBatch<FLBAType>(num_levels, def_levels, rep_levels, buffer);
}
-Status FileWriter::Impl::WriteColumnChunk(const Array& data) {
- // DictionaryArrays are not yet handled with a fast path. To still support
- // writing them as a workaround, we convert them back to their non-dictionary
- // representation.
- if (data.type()->id() == ::arrow::Type::DICTIONARY) {
- const ::arrow::DictionaryType& dict_type =
- static_cast<const ::arrow::DictionaryType&>(*data.type());
-
- // TODO(ARROW-1648): Remove this special handling once we require an Arrow
- // version that has this fixed.
- if (dict_type.dictionary()->type()->id() == ::arrow::Type::NA) {
- return WriteColumnChunk(::arrow::NullArray(data.length()));
- }
-
- FunctionContext ctx(pool_);
- std::shared_ptr<Array> plain_array;
- RETURN_NOT_OK(
- Cast(&ctx, data, dict_type.dictionary()->type(), CastOptions(),
&plain_array));
- return WriteColumnChunk(*plain_array);
- }
-
- ColumnWriter* column_writer;
- PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn());
+Status ArrowColumnWriter::Write(const Array& data) {
+ ::arrow::Type::type values_type;
+ RETURN_NOT_OK(GetLeafType(*data.type(), &values_type));
- int current_column_idx = row_group_writer_->current_column();
- std::shared_ptr<::arrow::Schema> arrow_schema;
- RETURN_NOT_OK(FromParquetSchema(writer_->schema(), {current_column_idx - 1},
- writer_->key_value_metadata(),
&arrow_schema));
- std::shared_ptr<Buffer> def_levels_buffer;
- std::shared_ptr<Buffer> rep_levels_buffer;
+ std::shared_ptr<Array> _values_array;
int64_t values_offset;
- ::arrow::Type::type values_type;
int64_t num_levels;
int64_t num_values;
+ LevelBuilder level_builder(ctx_->memory_pool);
- std::shared_ptr<Array> _values_array;
- LevelBuilder level_builder(pool_);
+ std::shared_ptr<Buffer> def_levels_buffer, rep_levels_buffer;
RETURN_NOT_OK(level_builder.GenerateLevels(
- data, arrow_schema->field(0), &values_offset, &values_type, &num_values,
- &num_levels, &def_levels_buffer, &rep_levels_buffer, &_values_array));
+ data, field_, &values_offset, &num_values, &num_levels,
ctx_->def_levels_buffer,
+ &def_levels_buffer, &rep_levels_buffer, &_values_array));
const int16_t* def_levels = nullptr;
if (def_levels_buffer) {
def_levels = reinterpret_cast<const int16_t*>(def_levels_buffer->data());
@@ -915,27 +853,26 @@ Status FileWriter::Impl::WriteColumnChunk(const Array&
data) {
}
std::shared_ptr<Array> values_array = _values_array->Slice(values_offset,
num_values);
-#define WRITE_BATCH_CASE(ArrowEnum, ArrowType, ParquetType) \
- case ::arrow::Type::ArrowEnum: \
- return TypedWriteBatch<ParquetType, ::arrow::ArrowType>( \
- column_writer, values_array, num_levels, def_levels, rep_levels);
+#define WRITE_BATCH_CASE(ArrowEnum, ArrowType, ParquetType)
\
+ case ::arrow::Type::ArrowEnum:
\
+ return TypedWriteBatch<ParquetType, ::arrow::ArrowType>(*values_array,
num_levels, \
+ def_levels,
rep_levels);
switch (values_type) {
case ::arrow::Type::UINT32: {
if (writer_->properties()->version() == ParquetVersion::PARQUET_1_0) {
// Parquet 1.0 reader cannot read the UINT_32 logical type. Thus we
need
// to use the larger Int64Type to store them lossless.
- return TypedWriteBatch<Int64Type, ::arrow::UInt32Type>(
- column_writer, values_array, num_levels, def_levels, rep_levels);
+ return TypedWriteBatch<Int64Type, ::arrow::UInt32Type>(*values_array,
num_levels,
+ def_levels,
rep_levels);
} else {
- return TypedWriteBatch<Int32Type, ::arrow::UInt32Type>(
- column_writer, values_array, num_levels, def_levels, rep_levels);
+ return TypedWriteBatch<Int32Type, ::arrow::UInt32Type>(*values_array,
num_levels,
+ def_levels,
rep_levels);
}
}
WRITE_BATCH_CASE(NA, NullType, Int32Type)
case ::arrow::Type::TIMESTAMP:
- return WriteTimestamps(column_writer, values_array, num_levels,
def_levels,
- rep_levels);
+ return WriteTimestamps(*values_array, num_levels, def_levels,
rep_levels);
WRITE_BATCH_CASE(BOOL, BooleanType, BooleanType)
WRITE_BATCH_CASE(INT8, Int8Type, Int32Type)
WRITE_BATCH_CASE(UINT8, UInt8Type, Int32Type)
@@ -957,20 +894,130 @@ Status FileWriter::Impl::WriteColumnChunk(const Array&
data) {
default:
break;
}
-
- PARQUET_CATCH_NOT_OK(column_writer->Close());
std::stringstream ss;
ss << "Data type not supported as list value: " <<
values_array->type()->ToString();
return Status::NotImplemented(ss.str());
}
-Status FileWriter::WriteColumnChunk(const ::arrow::Array& array) {
- return impl_->WriteColumnChunk(array);
+} // namespace
+
+// ----------------------------------------------------------------------
+// FileWriter implementation
+
+class FileWriter::Impl {
+ public:
+ Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties)
+ : writer_(std::move(writer)),
+ row_group_writer_(nullptr),
+ column_write_context_(pool, arrow_properties.get()),
+ arrow_properties_(arrow_properties),
+ closed_(false) {}
+
+ Status NewRowGroup(int64_t chunk_size) {
+ if (row_group_writer_ != nullptr) {
+ PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
+ }
+ PARQUET_CATCH_NOT_OK(row_group_writer_ = writer_->AppendRowGroup());
+ return Status::OK();
+ }
+
+ Status Close() {
+ if (!closed_) {
+ // Make idempotent
+ closed_ = true;
+ if (row_group_writer_ != nullptr) {
+ PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
+ }
+ PARQUET_CATCH_NOT_OK(writer_->Close());
+ }
+ return Status::OK();
+ }
+
+ Status WriteColumnChunk(const Array& data) {
+ // A bit awkward here since cannot instantiate ChunkedArray from const
Array&
+ ::arrow::ArrayVector chunks = {::arrow::MakeArray(data.data())};
+ auto chunked_array = std::make_shared<::arrow::ChunkedArray>(chunks);
+ return WriteColumnChunk(chunked_array, 0, data.length());
+ }
+
+ Status WriteColumnChunk(const std::shared_ptr<ChunkedArray>& data, int64_t
offset,
+ const int64_t size) {
+ // DictionaryArrays are not yet handled with a fast path. To still support
+ // writing them as a workaround, we convert them back to their
non-dictionary
+ // representation.
+ if (data->type()->id() == ::arrow::Type::DICTIONARY) {
+ const ::arrow::DictionaryType& dict_type =
+ static_cast<const ::arrow::DictionaryType&>(*data->type());
+
+ // TODO(ARROW-1648): Remove this special handling once we require an
Arrow
+ // version that has this fixed.
+ if (dict_type.dictionary()->type()->id() == ::arrow::Type::NA) {
+ auto null_array = std::make_shared<::arrow::NullArray>(data->length());
+ return WriteColumnChunk(*null_array);
+ }
+
+ FunctionContext ctx(this->memory_pool());
+ ::arrow::compute::Datum cast_input(data);
+ ::arrow::compute::Datum cast_output;
+ RETURN_NOT_OK(Cast(&ctx, cast_input, dict_type.dictionary()->type(),
CastOptions(),
+ &cast_output));
+ return WriteColumnChunk(cast_output.chunked_array(), 0, data->length());
+ }
+
+ ColumnWriter* column_writer;
+ PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn());
+
+ // TODO(wesm): This trick to construct a schema for one Parquet root node
+ // will not work for arbitrary nested data
+ int current_column_idx = row_group_writer_->current_column();
+ std::shared_ptr<::arrow::Schema> arrow_schema;
+ RETURN_NOT_OK(FromParquetSchema(writer_->schema(), {current_column_idx -
1},
+ writer_->key_value_metadata(),
&arrow_schema));
+
+ ArrowColumnWriter arrow_writer(&column_write_context_, column_writer,
+ arrow_schema->field(0));
+
+ RETURN_NOT_OK(arrow_writer.Write(*data, offset, size));
+ return arrow_writer.Close();
+ }
+
+ const WriterProperties& properties() const { return *writer_->properties(); }
+
+ ::arrow::MemoryPool* memory_pool() const { return
column_write_context_.memory_pool; }
+
+ virtual ~Impl() {}
+
+ private:
+ friend class FileWriter;
+
+ std::unique_ptr<ParquetFileWriter> writer_;
+ RowGroupWriter* row_group_writer_;
+ ColumnWriterContext column_write_context_;
+ std::shared_ptr<ArrowWriterProperties> arrow_properties_;
+ bool closed_;
+};
+
+Status FileWriter::NewRowGroup(int64_t chunk_size) {
+ return impl_->NewRowGroup(chunk_size);
+}
+
+Status FileWriter::WriteColumnChunk(const ::arrow::Array& data) {
+ return impl_->WriteColumnChunk(data);
+}
+
+Status FileWriter::WriteColumnChunk(const
std::shared_ptr<::arrow::ChunkedArray>& data,
+ const int64_t offset, const int64_t size) {
+ return impl_->WriteColumnChunk(data, offset, size);
+}
+
+Status FileWriter::WriteColumnChunk(const
std::shared_ptr<::arrow::ChunkedArray>& data) {
+ return WriteColumnChunk(data, 0, data->length());
}
Status FileWriter::Close() { return impl_->Close(); }
-MemoryPool* FileWriter::memory_pool() const { return impl_->pool_; }
+MemoryPool* FileWriter::memory_pool() const { return impl_->memory_pool(); }
FileWriter::~FileWriter() {}
@@ -1020,14 +1067,9 @@ Status FileWriter::Open(const ::arrow::Schema& schema,
::arrow::MemoryPool* pool
return Open(schema, pool, wrapper, properties, arrow_properties, writer);
}
-Status FileWriter::WriteTable(const Table& table, int64_t chunk_size) {
- // TODO(ARROW-232) Support writing chunked arrays.
- for (int i = 0; i < table.num_columns(); i++) {
- if (table.column(i)->data()->num_chunks() != 1) {
- return Status::NotImplemented("No support for writing chunked arrays
yet.");
- }
- }
+namespace {} // namespace
+Status FileWriter::WriteTable(const Table& table, int64_t chunk_size) {
if (chunk_size <= 0) {
return Status::Invalid("chunk size per row_group must be greater than 0");
} else if (chunk_size > impl_->properties().max_row_group_length()) {
@@ -1040,9 +1082,9 @@ Status FileWriter::WriteTable(const Table& table, int64_t
chunk_size) {
RETURN_NOT_OK_ELSE(NewRowGroup(size), PARQUET_IGNORE_NOT_OK(Close()));
for (int i = 0; i < table.num_columns(); i++) {
- std::shared_ptr<Array> array = table.column(i)->data()->chunk(0);
- array = array->Slice(offset, size);
- RETURN_NOT_OK_ELSE(WriteColumnChunk(*array),
PARQUET_IGNORE_NOT_OK(Close()));
+ auto chunked_data = table.column(i)->data();
+ RETURN_NOT_OK_ELSE(WriteColumnChunk(chunked_data, offset, size),
+ PARQUET_IGNORE_NOT_OK(Close()));
}
}
return Status::OK();
diff --git a/src/parquet/arrow/writer.h b/src/parquet/arrow/writer.h
index 24ba72d4..a432850c 100644
--- a/src/parquet/arrow/writer.h
+++ b/src/parquet/arrow/writer.h
@@ -133,15 +133,16 @@ class PARQUET_EXPORT FileWriter {
const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
std::unique_ptr<FileWriter>* writer);
- /**
- * Write a Table to Parquet.
- *
- * The table shall only consist of columns of primitive type or of primitive
lists.
- */
+ /// \brief Write a Table to Parquet.
::arrow::Status WriteTable(const ::arrow::Table& table, int64_t chunk_size);
::arrow::Status NewRowGroup(int64_t chunk_size);
::arrow::Status WriteColumnChunk(const ::arrow::Array& data);
+
+ /// \brief Write ColumnChunk in row group using slice of a ChunkedArray
+ ::arrow::Status WriteColumnChunk(const
std::shared_ptr<::arrow::ChunkedArray>& data,
+ const int64_t offset, const int64_t size);
+ ::arrow::Status WriteColumnChunk(const
std::shared_ptr<::arrow::ChunkedArray>& data);
::arrow::Status Close();
virtual ~FileWriter();
diff --git a/src/parquet/column_writer.h b/src/parquet/column_writer.h
index 7b8c7755..6b847480 100644
--- a/src/parquet/column_writer.h
+++ b/src/parquet/column_writer.h
@@ -117,6 +117,8 @@ class PARQUET_EXPORT ColumnWriter {
int64_t rows_written() const { return rows_written_; }
+ const WriterProperties* properties() { return properties_; }
+
protected:
virtual std::shared_ptr<Buffer> GetValuesBuffer() = 0;
diff --git a/src/parquet/file_reader.cc b/src/parquet/file_reader.cc
index 7b748120..72c71c66 100644
--- a/src/parquet/file_reader.cc
+++ b/src/parquet/file_reader.cc
@@ -64,9 +64,9 @@ RowGroupReader::RowGroupReader(std::unique_ptr<Contents>
contents)
: contents_(std::move(contents)) {}
std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
- DCHECK(i < metadata()->num_columns())
- << "The RowGroup only has " << metadata()->num_columns()
- << "columns, requested column: " << i;
+ DCHECK(i < metadata()->num_columns()) << "The RowGroup only has "
+ << metadata()->num_columns()
+ << "columns, requested column: " << i;
const ColumnDescriptor* descr = metadata()->schema()->Column(i);
std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i);
@@ -76,9 +76,9 @@ std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
}
std::unique_ptr<PageReader> RowGroupReader::GetColumnPageReader(int i) {
- DCHECK(i < metadata()->num_columns())
- << "The RowGroup only has " << metadata()->num_columns()
- << "columns, requested column: " << i;
+ DCHECK(i < metadata()->num_columns()) << "The RowGroup only has "
+ << metadata()->num_columns()
+ << "columns, requested column: " << i;
return contents_->GetColumnPageReader(i);
}
@@ -302,9 +302,9 @@ std::shared_ptr<FileMetaData> ParquetFileReader::metadata()
const {
}
std::shared_ptr<RowGroupReader> ParquetFileReader::RowGroup(int i) {
- DCHECK(i < metadata()->num_row_groups())
- << "The file only has " << metadata()->num_row_groups()
- << "row groups, requested reader for: " << i;
+ DCHECK(i < metadata()->num_row_groups()) << "The file only has "
+ << metadata()->num_row_groups()
+ << "row groups, requested reader
for: " << i;
return contents_->GetRowGroup(i);
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> [C++] Write Arrow tables with chunked columns
> ---------------------------------------------
>
> Key: PARQUET-1092
> URL: https://issues.apache.org/jira/browse/PARQUET-1092
> Project: Parquet
> Issue Type: Improvement
> Components: parquet-cpp
> Reporter: Wes McKinney
> Assignee: Wes McKinney
> Fix For: cpp-1.4.0
>
>
> Requires incoming patch in ARROW-232
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)