Repository: parquet-cpp Updated Branches: refs/heads/master fe8f98d0e -> c41a718da
PARQUET-890: Support I/O of DATE columns in parquet_arrow Also fixes a bug on reading INT96 timestamps. Author: Korn, Uwe <[email protected]> Closes #266 from xhochy/PARQUET-890 and squashes the following commits: 8481c2c [Korn, Uwe] ninja lint 666d41b [Korn, Uwe] PARQUET-890: Support I/O of DATE columns in parquet_arrow Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/c41a718d Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/c41a718d Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/c41a718d Branch: refs/heads/master Commit: c41a718dae9c60465ea0d8c99d6e3bdca11f802f Parents: fe8f98d Author: Korn, Uwe <[email protected]> Authored: Tue Mar 7 15:21:51 2017 -0500 Committer: Wes McKinney <[email protected]> Committed: Tue Mar 7 15:21:51 2017 -0500 ---------------------------------------------------------------------- src/parquet/arrow/arrow-reader-writer-test.cc | 13 ++- src/parquet/arrow/arrow-schema-test.cc | 11 ++- src/parquet/arrow/reader.cc | 49 +++++++++- src/parquet/arrow/schema.cc | 18 +++- src/parquet/arrow/test-util.h | 49 +++++++++- src/parquet/arrow/writer.cc | 106 ++++++++++++++------- 6 files changed, 200 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c41a718d/src/parquet/arrow/arrow-reader-writer-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index 2dfdbd2..a0a39f1 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -149,6 +149,15 @@ struct test_traits<::arrow::TimestampType> { const int64_t test_traits<::arrow::TimestampType>::value(14695634030000); template <> +struct test_traits<::arrow::DateType> { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::DATE; + static int64_t const value; +}; + +const int64_t test_traits<::arrow::DateType>::value(14688000000000); + +template <> struct test_traits<::arrow::FloatType> { static constexpr ParquetType::type parquet_enum = ParquetType::FLOAT; static constexpr LogicalType::type logical_enum = LogicalType::NONE; @@ -309,8 +318,8 @@ class TestParquetIO : public ::testing::Test { typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type, - ::arrow::Int64Type, ::arrow::TimestampType, ::arrow::FloatType, ::arrow::DoubleType, - ::arrow::StringType, ::arrow::BinaryType> + ::arrow::Int64Type, ::arrow::TimestampType, ::arrow::DateType, ::arrow::FloatType, + ::arrow::DoubleType, ::arrow::StringType, ::arrow::BinaryType> TestTypes; TYPED_TEST_CASE(TestParquetIO, TestTypes); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c41a718d/src/parquet/arrow/arrow-schema-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc index 43e57d8..8db792f 100644 --- a/src/parquet/arrow/arrow-schema-test.cc +++ b/src/parquet/arrow/arrow-schema-test.cc @@ -98,6 +98,10 @@ TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) { ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS)); arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_MS, false)); + parquet_fields.push_back(PrimitiveNode::Make( + "date", Repetition::REQUIRED, ParquetType::INT32, LogicalType::DATE)); + arrow_fields.push_back(std::make_shared<Field>("date", ::arrow::date(), false)); + parquet_fields.push_back( PrimitiveNode::Make("timestamp96", Repetition::REQUIRED, ParquetType::INT96)); arrow_fields.push_back(std::make_shared<Field>("timestamp96", TIMESTAMP_NS, false)); @@ -339,9 +343,6 @@ TEST_F(TestConvertParquetSchema, ParquetLists) { TEST_F(TestConvertParquetSchema, UnsupportedThings) { std::vector<NodePtr> unsupported_nodes; - unsupported_nodes.push_back(PrimitiveNode::Make( - "int32", Repetition::OPTIONAL, ParquetType::INT32, LogicalType::DATE)); - for (const NodePtr& node : unsupported_nodes) { ASSERT_RAISES(NotImplemented, ConvertSchema({node})); } @@ -394,6 +395,10 @@ TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) { PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64)); arrow_fields.push_back(std::make_shared<Field>("int64", INT64, false)); + parquet_fields.push_back(PrimitiveNode::Make( + "date", Repetition::REQUIRED, ParquetType::INT32, LogicalType::DATE)); + arrow_fields.push_back(std::make_shared<Field>("date", ::arrow::date(), false)); + parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED, ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS)); arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_MS, false)); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c41a718d/src/parquet/arrow/reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc index df34d4c..73f6d87 100644 --- a/src/parquet/arrow/reader.cc +++ b/src/parquet/arrow/reader.cc @@ -361,7 +361,7 @@ Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96Typ PARQUET_CATCH_NOT_OK(*levels_read = reader->ReadBatch( values_to_read, nullptr, nullptr, values, &values_read)); - int64_t* out_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_); + int64_t* out_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_) + valid_bits_idx_; for (int64_t i = 0; i < values_read; i++) { *out_ptr++ = impala_timestamp_to_nanoseconds(values[i]); } @@ -371,6 +371,24 @@ Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96Typ } template <> +Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::DateType, Int32Type>( + TypedColumnReader<Int32Type>* reader, int64_t values_to_read, int64_t* levels_read) { + RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(int32_t), false)); + auto values = reinterpret_cast<int32_t*>(values_buffer_.mutable_data()); + int64_t values_read; + PARQUET_CATCH_NOT_OK(*levels_read = reader->ReadBatch( + values_to_read, nullptr, nullptr, values, &values_read)); + + int64_t* out_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_) + valid_bits_idx_; + for (int64_t i = 0; i < values_read; i++) { + *out_ptr++ = static_cast<int64_t>(values[i]) * 86400000; + } + valid_bits_idx_ += values_read; + + return Status::OK(); +} + +template <> Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::BooleanType, BooleanType>( TypedColumnReader<BooleanType>* reader, int64_t values_to_read, int64_t* levels_read) { @@ -464,6 +482,30 @@ Status ColumnReader::Impl::ReadNullableBatch<::arrow::TimestampType, Int96Type>( } template <> +Status ColumnReader::Impl::ReadNullableBatch<::arrow::DateType, Int32Type>( + TypedColumnReader<Int32Type>* reader, int16_t* def_levels, int16_t* rep_levels, + int64_t values_to_read, int64_t* levels_read, int64_t* values_read) { + RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(int32_t), false)); + auto values = reinterpret_cast<int32_t*>(values_buffer_.mutable_data()); + int64_t null_count; + PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(values_to_read, def_levels, rep_levels, + values, valid_bits_ptr_, valid_bits_idx_, levels_read, values_read, &null_count)); + + auto data_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_); + INIT_BITSET(valid_bits_ptr_, valid_bits_idx_); + for (int64_t i = 0; i < *values_read; i++) { + if (bitset_valid_bits_ptr_ & (1 << bit_offset_valid_bits_ptr_)) { + data_ptr[valid_bits_idx_ + i] = static_cast<int64_t>(values[i]) * 86400000; + } + READ_NEXT_BITSET(valid_bits_ptr_); + } + null_count_ += null_count; + valid_bits_idx_ += *values_read; + + return Status::OK(); +} + +template <> Status ColumnReader::Impl::ReadNullableBatch<::arrow::BooleanType, BooleanType>( TypedColumnReader<BooleanType>* reader, int16_t* def_levels, int16_t* rep_levels, int64_t values_to_read, int64_t* levels_read, int64_t* values_read) { @@ -843,6 +885,7 @@ Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out TYPED_BATCH_CASE(INT16, ::arrow::Int16Type, Int32Type) TYPED_BATCH_CASE(UINT32, ::arrow::UInt32Type, Int32Type) TYPED_BATCH_CASE(INT32, ::arrow::Int32Type, Int32Type) + TYPED_BATCH_CASE(DATE, ::arrow::DateType, Int32Type) TYPED_BATCH_CASE(UINT64, ::arrow::UInt64Type, Int64Type) TYPED_BATCH_CASE(INT64, ::arrow::Int64Type, Int64Type) TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType) @@ -865,7 +908,9 @@ Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out break; } default: - return Status::NotImplemented(field_->type->ToString()); + std::stringstream ss; + ss << "No support for reading columns of type " << field_->type->ToString(); + return Status::NotImplemented(ss.str()); } } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c41a718d/src/parquet/arrow/schema.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc index 65e3381..0c336d9 100644 --- a/src/parquet/arrow/schema.cc +++ b/src/parquet/arrow/schema.cc @@ -77,7 +77,10 @@ static Status FromFLBA(const PrimitiveNode* node, TypePtr* out) { *out = MakeDecimalType(node); break; default: - return Status::NotImplemented("unhandled type"); + std::stringstream ss; + ss << "Unhandled logical type " << LogicalTypeToString(node->logical_type()) + << " for fixed-length binary array"; + return Status::NotImplemented(ss.str()); break; } @@ -104,11 +107,17 @@ static Status FromInt32(const PrimitiveNode* node, TypePtr* out) { case LogicalType::UINT_32: *out = ::arrow::uint32(); break; + case LogicalType::DATE: + *out = ::arrow::date(); + break; case LogicalType::DECIMAL: *out = MakeDecimalType(node); break; default: - return Status::NotImplemented("Unhandled logical type for int32"); + std::stringstream ss; + ss << "Unhandled logical type " << LogicalTypeToString(node->logical_type()) + << " for INT32"; + return Status::NotImplemented(ss.str()); break; } return Status::OK(); @@ -129,7 +138,10 @@ static Status FromInt64(const PrimitiveNode* node, TypePtr* out) { *out = TIMESTAMP_MS; break; default: - return Status::NotImplemented("Unhandled logical type for int64"); + std::stringstream ss; + ss << "Unhandled logical type " << LogicalTypeToString(node->logical_type()) + << " for INT64"; + return Status::NotImplemented(ss.str()); break; } return Status::OK(); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c41a718d/src/parquet/arrow/test-util.h ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h index bfc9ce1..07f1f28 100644 --- a/src/parquet/arrow/test-util.h +++ b/src/parquet/arrow/test-util.h @@ -34,6 +34,9 @@ template <typename ArrowType> using is_arrow_int = std::is_integral<typename ArrowType::c_type>; template <typename ArrowType> +using is_arrow_date = std::is_same<ArrowType, ::arrow::DateType>; + +template <typename ArrowType> using is_arrow_string = std::is_same<ArrowType, ::arrow::StringType>; template <typename ArrowType> @@ -53,10 +56,27 @@ typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type NonNullA } template <class ArrowType> -typename std::enable_if<is_arrow_int<ArrowType>::value, Status>::type NonNullArray( +typename std::enable_if< + is_arrow_int<ArrowType>::value && !is_arrow_date<ArrowType>::value, Status>::type +NonNullArray(size_t size, std::shared_ptr<Array>* out) { + std::vector<typename ArrowType::c_type> values; + ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values); + + // Passing data type so this will work with TimestampType too + ::arrow::NumericBuilder<ArrowType> builder( + ::arrow::default_memory_pool(), std::make_shared<ArrowType>()); + builder.Append(values.data(), values.size()); + return builder.Finish(out); +} + +template <class ArrowType> +typename std::enable_if<is_arrow_date<ArrowType>::value, Status>::type NonNullArray( size_t size, std::shared_ptr<Array>* out) { std::vector<typename ArrowType::c_type> values; ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values); + for (size_t i = 0; i < size; i++) { + values[i] *= 86400000; + } // Passing data type so this will work with TimestampType too ::arrow::NumericBuilder<ArrowType> builder( @@ -107,13 +127,38 @@ typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type Nullable // This helper function only supports (size/2) nulls. template <typename ArrowType> -typename std::enable_if<is_arrow_int<ArrowType>::value, Status>::type NullableArray( +typename std::enable_if< + is_arrow_int<ArrowType>::value && !is_arrow_date<ArrowType>::value, Status>::type +NullableArray(size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<Array>* out) { + std::vector<typename ArrowType::c_type> values; + + // Seed is random in Arrow right now + (void)seed; + ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values); + std::vector<uint8_t> valid_bytes(size, 1); + + for (size_t i = 0; i < num_nulls; i++) { + valid_bytes[i * 2] = 0; + } + + // Passing data type so this will work with TimestampType too + ::arrow::NumericBuilder<ArrowType> builder( + ::arrow::default_memory_pool(), std::make_shared<ArrowType>()); + builder.Append(values.data(), values.size(), valid_bytes.data()); + return builder.Finish(out); +} + +template <typename ArrowType> +typename std::enable_if<is_arrow_date<ArrowType>::value, Status>::type NullableArray( size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<Array>* out) { std::vector<typename ArrowType::c_type> values; // Seed is random in Arrow right now (void)seed; ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values); + for (size_t i = 0; i < size; i++) { + values[i] *= 86400000; + } std::vector<uint8_t> valid_bytes(size, 1); for (size_t i = 0; i < num_nulls; i++) { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c41a718d/src/parquet/arrow/writer.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc index 90e037f..6d2f9c0 100644 --- a/src/parquet/arrow/writer.cc +++ b/src/parquet/arrow/writer.cc @@ -257,39 +257,16 @@ class FileWriter::Impl { int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels); template <typename ParquetType, typename ArrowType> + Status WriteNonNullableBatch(TypedColumnWriter<ParquetType>* writer, int64_t num_values, + int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels, + const typename ArrowType::c_type* data_ptr); + + template <typename ParquetType, typename ArrowType> Status WriteNullableBatch(TypedColumnWriter<ParquetType>* writer, int64_t num_values, int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset, const typename ArrowType::c_type* data_ptr); - // TODO(uwe): Same code as in reader.cc the only difference is the name of the temporary - // buffer - template <typename InType, typename OutType> - struct can_copy_ptr { - static constexpr bool value = - std::is_same<InType, OutType>::value || - (std::is_integral<InType>{} && std::is_integral<OutType>{} && - (sizeof(InType) == sizeof(OutType))); - }; - - template <typename InType, typename OutType, - typename std::enable_if<can_copy_ptr<InType, OutType>::value>::type* = nullptr> - Status ConvertPhysicalType(const InType* in_ptr, int64_t, const OutType** out_ptr) { - *out_ptr = reinterpret_cast<const OutType*>(in_ptr); - return Status::OK(); - } - - template <typename InType, typename OutType, - typename std::enable_if<not can_copy_ptr<InType, OutType>::value>::type* = nullptr> - Status ConvertPhysicalType( - const InType* in_ptr, int64_t length, const OutType** out_ptr) { - RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(OutType))); - OutType* mutable_out_ptr = reinterpret_cast<OutType*>(data_buffer_.mutable_data()); - std::copy(in_ptr, in_ptr + length, mutable_out_ptr); - *out_ptr = mutable_out_ptr; - return Status::OK(); - } - Status WriteColumnChunk(const Array& data); Status Close(); @@ -323,7 +300,6 @@ Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels) { using ArrowCType = typename ArrowType::c_type; - using ParquetCType = typename ParquetType::c_type; auto data = static_cast<const PrimitiveArray*>(array.get()); auto data_ptr = reinterpret_cast<const ArrowCType*>(data->data()->data()); @@ -331,11 +307,8 @@ Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer, if (writer->descr()->schema_node()->is_required() || (data->null_count() == 0)) { // no nulls, just dump the data - const ParquetCType* data_writer_ptr = nullptr; - RETURN_NOT_OK((ConvertPhysicalType<ArrowCType, ParquetCType>( - data_ptr + data->offset(), array->length(), &data_writer_ptr))); - PARQUET_CATCH_NOT_OK( - writer->WriteBatch(num_levels, def_levels, rep_levels, data_writer_ptr)); + RETURN_NOT_OK((WriteNonNullableBatch<ParquetType, ArrowType>(writer, array->length(), + num_levels, def_levels, rep_levels, data_ptr + data->offset()))); } else { const uint8_t* valid_bits = data->null_bitmap_data(); RETURN_NOT_OK((WriteNullableBatch<ParquetType, ArrowType>(writer, data->length(), @@ -347,6 +320,49 @@ Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer, } template <typename ParquetType, typename ArrowType> +Status FileWriter::Impl::WriteNonNullableBatch(TypedColumnWriter<ParquetType>* writer, + int64_t num_values, int64_t num_levels, const int16_t* def_levels, + const int16_t* rep_levels, const typename ArrowType::c_type* data_ptr) { + using ParquetCType = typename ParquetType::c_type; + RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(ParquetCType))); + auto buffer_ptr = reinterpret_cast<ParquetCType*>(data_buffer_.mutable_data()); + std::copy(data_ptr, data_ptr + num_values, buffer_ptr); + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr)); + return Status::OK(); +} + +template <> +Status FileWriter::Impl::WriteNonNullableBatch<Int32Type, ::arrow::DateType>( + TypedColumnWriter<Int32Type>* writer, int64_t num_values, int64_t num_levels, + const int16_t* def_levels, const int16_t* rep_levels, const int64_t* data_ptr) { + RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t))); + auto buffer_ptr = reinterpret_cast<int32_t*>(data_buffer_.mutable_data()); + for (int i = 0; i < num_values; i++) { + buffer_ptr[i] = static_cast<int32_t>(data_ptr[i] / 86400000); + } + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr)); + return Status::OK(); +} + +#define NONNULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \ + template <> \ + Status FileWriter::Impl::WriteNonNullableBatch<ParquetType, ArrowType>( \ + TypedColumnWriter<ParquetType> * writer, int64_t num_values, int64_t num_levels, \ + const int16_t* def_levels, const int16_t* rep_levels, const CType* data_ptr) { \ + PARQUET_CATCH_NOT_OK( \ + writer->WriteBatch(num_levels, def_levels, rep_levels, data_ptr)); \ + return Status::OK(); \ + } + +NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t) +NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t) +NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t) +NONNULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float) +NONNULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double) + +template <typename ParquetType, typename ArrowType> Status FileWriter::Impl::WriteNullableBatch(TypedColumnWriter<ParquetType>* writer, int64_t num_values, int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset, @@ -368,6 +384,27 @@ Status FileWriter::Impl::WriteNullableBatch(TypedColumnWriter<ParquetType>* writ return Status::OK(); } +template <> +Status FileWriter::Impl::WriteNullableBatch<Int32Type, ::arrow::DateType>( + TypedColumnWriter<Int32Type>* writer, int64_t num_values, int64_t num_levels, + const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, + int64_t valid_bits_offset, const int64_t* data_ptr) { + RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t))); + auto buffer_ptr = reinterpret_cast<int32_t*>(data_buffer_.mutable_data()); + INIT_BITSET(valid_bits, valid_bits_offset); + for (int i = 0; i < num_values; i++) { + if (bitset_valid_bits & (1 << bit_offset_valid_bits)) { + // Convert from milliseconds into days since the epoch + buffer_ptr[i] = static_cast<int32_t>(data_ptr[i] / 86400000); + } + READ_NEXT_BITSET(valid_bits); + } + PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced( + num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, buffer_ptr)); + + return Status::OK(); +} + #define NULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \ template <> \ Status FileWriter::Impl::WriteNullableBatch<ParquetType, ArrowType>( \ @@ -519,6 +556,7 @@ Status FileWriter::Impl::WriteColumnChunk(const Array& data) { WRITE_BATCH_CASE(INT16, Int16Type, Int32Type) WRITE_BATCH_CASE(UINT16, UInt16Type, Int32Type) WRITE_BATCH_CASE(INT32, Int32Type, Int32Type) + WRITE_BATCH_CASE(DATE, DateType, Int32Type) WRITE_BATCH_CASE(INT64, Int64Type, Int64Type) WRITE_BATCH_CASE(TIMESTAMP, TimestampType, Int64Type) WRITE_BATCH_CASE(UINT64, UInt64Type, Int64Type)
