wgtmac commented on code in PR #216:
URL: https://github.com/apache/iceberg-cpp/pull/216#discussion_r2366611922
##########
src/iceberg/file_writer.h:
##########
@@ -65,7 +65,7 @@ class ICEBERG_EXPORT Writer {
/// \brief Write arrow data to the file.
///
/// \return Status of write results.
- virtual Status Write(ArrowArray data) = 0;
+ virtual Status Write(ArrowArray& data) = 0;
Review Comment:
`ArrowArray` is a simple C struct and will be moved away (be invalid) after
the call. Therefore it looks a little bit strange to use a reference here.
Perhaps changing it to `ArrowArray*` and document the behavior?
##########
src/iceberg/arrow/nanoarrow_error_transform_internal.h:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#define ICEBERG_NANOARROW_RETURN_IF_NOT_OK(status, error) \
+ if (status != NANOARROW_OK) [[unlikely]] { \
+ return InvalidArrowData("Nanoarrow error msg: {}", error.message); \
Review Comment:
```suggestion
return InvalidArrowData("nanoarrow error: {}", error.message); \
```
##########
src/iceberg/manifest_adapter.cc:
##########
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest_adapter.h"
+
+#include "iceberg/arrow/nanoarrow_error_transform_internal.h"
+#include "iceberg/manifest_entry.h"
+#include "iceberg/manifest_list.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+#include "nanoarrow/nanoarrow.h"
Review Comment:
```suggestion
#include <nanoarrow/nanoarrow.h>
```
##########
src/iceberg/manifest_adapter.h:
##########
@@ -33,34 +41,107 @@ class ICEBERG_EXPORT ManifestAdapter {
public:
ManifestAdapter() = default;
virtual ~ManifestAdapter() = default;
+ virtual Status Init() = 0;
- virtual Status StartAppending() = 0;
- virtual Result<ArrowArray> FinishAppending() = 0;
+ Status StartAppending();
+ Result<std::shared_ptr<ArrowArray>> FinishAppending();
int64_t size() const { return size_; }
protected:
- ArrowArray array_;
+ static Status AppendField(ArrowArray* arrowArray, int64_t value);
+ static Status AppendField(ArrowArray* arrowArray, uint64_t value);
+ static Status AppendField(ArrowArray* arrowArray, double value);
+ static Status AppendField(ArrowArray* arrowArray, std::string_view value);
+ static Status AppendField(ArrowArray* arrowArray,
+ const std::span<const uint8_t>& value);
+
+ protected:
+ std::shared_ptr<ArrowArray> array_;
+ ArrowSchema schema_; // converted from manifest_schema_ or
manifest_list_schema_
int64_t size_ = 0;
};
// \brief Implemented by different versions with different schemas to
// append a list of `ManifestEntry`s to an `ArrowArray`.
class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
public:
- ManifestEntryAdapter() = default;
- ~ManifestEntryAdapter() override = default;
+ explicit ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec)
+ : partition_spec_(std::move(partition_spec)) {}
+ ~ManifestEntryAdapter() override;
virtual Status Append(const ManifestEntry& entry) = 0;
+
+ const std::shared_ptr<Schema>& schema() const { return manifest_schema_; }
+
+ protected:
+ virtual Result<std::shared_ptr<StructType>> GetManifestEntryStructType();
+
+ /// \brief Init version-specific schema for each version.
+ ///
+ /// \param fields_ids each version of manifest schema has schema, we will
init this
+ /// schema based on the fields_ids.
+ Status InitSchema(const std::unordered_set<int32_t>& fields_ids);
+ Status AppendInternal(const ManifestEntry& entry);
+ Status AppendDataFile(ArrowArray* arrow_array,
+ const std::shared_ptr<StructType>& data_file_type,
+ const std::shared_ptr<DataFile>& file);
Review Comment:
```suggestion
const StructType& data_file_type,
const DataFile& file);
```
##########
src/iceberg/manifest_adapter.cc:
##########
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest_adapter.h"
+
+#include "iceberg/arrow/nanoarrow_error_transform_internal.h"
+#include "iceberg/manifest_entry.h"
+#include "iceberg/manifest_list.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+#include "nanoarrow/nanoarrow.h"
+
+#define NANOARROW_RETURN_IF_FAILED(status) \
+ if (status != NANOARROW_OK) [[unlikely]] { \
+ return InvalidArrowData("Nanoarrow error code: {}", status); \
+ }
+
+namespace {
+static constexpr int64_t kBlockSizeInBytesV1 = 64 * 1024 * 1024L;
+}
+
+namespace iceberg {
+
+Status ManifestAdapter::StartAppending() {
+ if (size_ > 0) {
+ return InvalidArgument("Adapter buffer not empty, cannot start
appending.");
+ }
+ array_ = std::make_shared<ArrowArray>();
+ size_ = 0;
+ ArrowError error;
+ ICEBERG_NANOARROW_RETURN_IF_NOT_OK(
+ ArrowArrayInitFromSchema(array_.get(), &schema_, &error), error);
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayStartAppending(array_.get()));
+ return {};
+}
+
+Result<std::shared_ptr<ArrowArray>> ManifestAdapter::FinishAppending() {
+ ArrowError error;
+ ICEBERG_NANOARROW_RETURN_IF_NOT_OK(
+ ArrowArrayFinishBuildingDefault(array_.get(), &error), error);
+ return array_;
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, int64_t value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendInt(arrowArray, value));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, uint64_t value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendUInt(arrowArray, value));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, double value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendDouble(arrowArray, value));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, std::string_view
value) {
+ ArrowStringView view(value.data(), value.size());
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendString(arrowArray, view));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray,
+ const std::span<const uint8_t>& value) {
+ ArrowBufferViewData data;
+ data.as_char = reinterpret_cast<const char*>(value.data());
+ ArrowBufferView view(data, value.size());
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendBytes(arrowArray, view));
+ return {};
+}
+
+ManifestEntryAdapter::~ManifestEntryAdapter() {
+ if (array_ != nullptr && array_->release != nullptr) {
+ ArrowArrayRelease(array_.get());
+ }
+ if (schema_.release != nullptr) {
+ ArrowSchemaRelease(&schema_);
+ }
+}
+
+Result<std::shared_ptr<StructType>>
ManifestEntryAdapter::GetManifestEntryStructType() {
+ if (partition_spec_ == nullptr) {
+ return ManifestEntry::TypeFromPartitionType(nullptr);
+ }
+ ICEBERG_ASSIGN_OR_RAISE(auto partition_schema,
partition_spec_->partition_schema());
+ return ManifestEntry::TypeFromPartitionType(std::move(partition_schema));
+}
+
+Status ManifestEntryAdapter::AppendPartition(
+ ArrowArray* arrow_array, const std::shared_ptr<StructType>& partition_type,
+ const std::vector<Literal>& partitions) {
+ if (arrow_array->n_children != partition_type->fields().size()) [[unlikely]]
{
+ return InvalidManifest("Arrow array of partition does not match partition
type.");
+ }
+ if (partitions.size() != partition_type->fields().size()) [[unlikely]] {
+ return InvalidManifest("Literal list of partition does not match partition
type.");
+ }
+ auto fields = partition_type->fields();
+
+ for (int32_t i = 0; i < fields.size(); i++) {
+ const auto& partition = partitions[i];
+ const auto& field = fields[i];
+ auto array = arrow_array->children[i];
+ if (partition.IsNull()) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ continue;
+ }
+ switch (field.type()->type_id()) {
+ case TypeId::kBoolean:
+ ICEBERG_RETURN_UNEXPECTED(AppendField(
+ array,
+ static_cast<uint64_t>(std::get<bool>(partition.value()) == true ?
1L : 0L)));
+ break;
+ case TypeId::kInt:
+ ICEBERG_RETURN_UNEXPECTED(AppendField(
+ array,
static_cast<int64_t>(std::get<int32_t>(partition.value()))));
+ break;
+ case TypeId::kLong:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, std::get<int64_t>(partition.value())));
+ break;
+ case TypeId::kFloat:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array,
static_cast<double>(std::get<float>(partition.value()))));
+ break;
+ case TypeId::kDouble:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, std::get<double>(partition.value())));
+ break;
+ case TypeId::kString:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, std::get<std::string>(partition.value())));
+ break;
+ case TypeId::kFixed:
+ case TypeId::kBinary:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array,
std::get<std::vector<uint8_t>>(partition.value())));
+ break;
+ case TypeId::kDate:
+ ICEBERG_RETURN_UNEXPECTED(AppendField(
+ array,
static_cast<int64_t>(std::get<int32_t>(partition.value()))));
+ break;
+ case TypeId::kTime:
+ case TypeId::kTimestamp:
+ case TypeId::kTimestampTz:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, std::get<int64_t>(partition.value())));
+ break;
+ case TypeId::kDecimal:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, std::get<std::array<uint8_t,
16>>(partition.value())));
Review Comment:
```suggestion
AppendField(array, std::get<int128_t>(partition.value())));
```
##########
src/iceberg/manifest_adapter.h:
##########
@@ -33,34 +41,107 @@ class ICEBERG_EXPORT ManifestAdapter {
public:
ManifestAdapter() = default;
virtual ~ManifestAdapter() = default;
+ virtual Status Init() = 0;
- virtual Status StartAppending() = 0;
- virtual Result<ArrowArray> FinishAppending() = 0;
+ Status StartAppending();
+ Result<std::shared_ptr<ArrowArray>> FinishAppending();
int64_t size() const { return size_; }
protected:
- ArrowArray array_;
+ static Status AppendField(ArrowArray* arrowArray, int64_t value);
+ static Status AppendField(ArrowArray* arrowArray, uint64_t value);
+ static Status AppendField(ArrowArray* arrowArray, double value);
+ static Status AppendField(ArrowArray* arrowArray, std::string_view value);
+ static Status AppendField(ArrowArray* arrowArray,
+ const std::span<const uint8_t>& value);
+
+ protected:
+ std::shared_ptr<ArrowArray> array_;
+ ArrowSchema schema_; // converted from manifest_schema_ or
manifest_list_schema_
int64_t size_ = 0;
};
// \brief Implemented by different versions with different schemas to
// append a list of `ManifestEntry`s to an `ArrowArray`.
class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
public:
- ManifestEntryAdapter() = default;
- ~ManifestEntryAdapter() override = default;
+ explicit ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec)
+ : partition_spec_(std::move(partition_spec)) {}
+ ~ManifestEntryAdapter() override;
virtual Status Append(const ManifestEntry& entry) = 0;
+
+ const std::shared_ptr<Schema>& schema() const { return manifest_schema_; }
+
+ protected:
+ virtual Result<std::shared_ptr<StructType>> GetManifestEntryStructType();
+
+ /// \brief Init version-specific schema for each version.
+ ///
+ /// \param fields_ids each version of manifest schema has schema, we will
init this
+ /// schema based on the fields_ids.
+ Status InitSchema(const std::unordered_set<int32_t>& fields_ids);
+ Status AppendInternal(const ManifestEntry& entry);
+ Status AppendDataFile(ArrowArray* arrow_array,
+ const std::shared_ptr<StructType>& data_file_type,
+ const std::shared_ptr<DataFile>& file);
+ static Status AppendPartition(ArrowArray* arrow_array,
+ const std::shared_ptr<StructType>&
partition_type,
+ const std::vector<Literal>& partitions);
+ static Status AppendList(ArrowArray* arrow_array,
+ const std::vector<int32_t>& list_value);
+ static Status AppendList(ArrowArray* arrow_array,
+ const std::vector<int64_t>& list_value);
+ static Status AppendMap(ArrowArray* arrow_array,
+ const std::map<int32_t, int64_t>& map_value);
+ static Status AppendMap(ArrowArray* arrow_array,
+ const std::map<int32_t, std::vector<uint8_t>>&
map_value);
+
+ virtual Result<std::optional<int64_t>> GetSequenceNumber(const
ManifestEntry& entry);
+ virtual Result<std::optional<std::string>> GetWrappedReferenceDataFile(
+ const std::shared_ptr<DataFile>& file);
+ virtual Result<std::optional<int64_t>> GetWrappedFirstRowId(
+ const std::shared_ptr<DataFile>& file);
+ virtual Result<std::optional<int64_t>> GetWrappedContentOffset(
+ const std::shared_ptr<DataFile>& file);
+ virtual Result<std::optional<int64_t>> GetWrappedContentSizeInBytes(
+ const std::shared_ptr<DataFile>& file);
+
+ protected:
+ std::shared_ptr<PartitionSpec> partition_spec_;
+ std::shared_ptr<Schema> manifest_schema_;
+ std::unordered_map<std::string, std::string> metadata_;
};
// \brief Implemented by different versions with different schemas to
// append a list of `ManifestFile`s to an `ArrowArray`.
class ICEBERG_EXPORT ManifestFileAdapter : public ManifestAdapter {
public:
ManifestFileAdapter() = default;
- ~ManifestFileAdapter() override = default;
+ ~ManifestFileAdapter() override;
virtual Status Append(const ManifestFile& file) = 0;
+
+ const std::shared_ptr<Schema>& schema() const { return
manifest_list_schema_; }
+
+ protected:
+ /// \brief Init version-specific schema for each version.
+ ///
+ /// \param fields_ids each version of manifest schema has schema, we will
init this
+ /// schema based on the fields_ids.
+ Status InitSchema(const std::unordered_set<int32_t>& fields_ids);
+ Status AppendInternal(const ManifestFile& file);
+ static Status AppendPartitions(ArrowArray* arrow_array,
+ const std::shared_ptr<ListType>&
partition_type,
+ const std::vector<PartitionFieldSummary>&
partitions);
+
+ virtual Result<int64_t> GetSequenceNumber(const ManifestFile& file);
+ virtual Result<int64_t> GetWrappedMinSequenceNumber(const ManifestFile&
file);
Review Comment:
Same question for removing `Wrapped` from the name
##########
src/iceberg/partition_spec.h:
##########
@@ -67,6 +69,8 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable
{
/// \brief Get a view of the partition fields.
std::span<const PartitionField> fields() const;
+ Result<std::shared_ptr<Schema>> partition_schema();
Review Comment:
```suggestion
Result<std::shared_ptr<Schema>> GetPartitionSchema();
```
This is not a trivial getter so we cannot use the snake case form.
##########
src/iceberg/manifest_adapter.h:
##########
@@ -33,34 +41,107 @@ class ICEBERG_EXPORT ManifestAdapter {
public:
ManifestAdapter() = default;
virtual ~ManifestAdapter() = default;
+ virtual Status Init() = 0;
- virtual Status StartAppending() = 0;
- virtual Result<ArrowArray> FinishAppending() = 0;
+ Status StartAppending();
+ Result<std::shared_ptr<ArrowArray>> FinishAppending();
int64_t size() const { return size_; }
protected:
- ArrowArray array_;
+ static Status AppendField(ArrowArray* arrowArray, int64_t value);
+ static Status AppendField(ArrowArray* arrowArray, uint64_t value);
+ static Status AppendField(ArrowArray* arrowArray, double value);
+ static Status AppendField(ArrowArray* arrowArray, std::string_view value);
+ static Status AppendField(ArrowArray* arrowArray,
+ const std::span<const uint8_t>& value);
Review Comment:
```suggestion
std::span<const uint8_t> value);
```
std::span is a view so we don't need pass by const ref.
##########
test/manifest_reader_writer_test.cc:
##########
@@ -144,6 +166,20 @@ TEST_F(ManifestReaderV1Test, PartitionedTest) {
partition_schema);
}
+TEST_F(ManifestReaderV1Test, WritePartitionedTest) {
+ iceberg::SchemaField partition_field(1000, "order_ts_hour",
iceberg::int32(), true);
+ auto partition_schema =
+ std::make_shared<Schema>(std::vector<SchemaField>({partition_field}));
+ auto identity_transform = Transform::Identity();
+ std::vector<PartitionField> fields{
+ PartitionField(1000, 1000, "order_ts_hour", identity_transform)};
+ auto partition_spec = std::make_shared<PartitionSpec>(partition_schema, 1,
fields);
+
+ auto expected_entries = PreparePartitionedTestData();
+ auto write_manifest_path = CreateNewTempFilePath();
+ TestWriteManifest(write_manifest_path, partition_spec, expected_entries);
Review Comment:
`TestManifestReadingByPath` is missing? Same for below.
##########
src/iceberg/partition_spec.h:
##########
@@ -67,6 +69,8 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable
{
/// \brief Get a view of the partition fields.
std::span<const PartitionField> fields() const;
+ Result<std::shared_ptr<Schema>> partition_schema();
Review Comment:
BTW, can we add a test case for this?
##########
src/iceberg/manifest_adapter.h:
##########
@@ -33,34 +41,107 @@ class ICEBERG_EXPORT ManifestAdapter {
public:
ManifestAdapter() = default;
virtual ~ManifestAdapter() = default;
+ virtual Status Init() = 0;
- virtual Status StartAppending() = 0;
- virtual Result<ArrowArray> FinishAppending() = 0;
+ Status StartAppending();
+ Result<std::shared_ptr<ArrowArray>> FinishAppending();
int64_t size() const { return size_; }
protected:
- ArrowArray array_;
+ static Status AppendField(ArrowArray* arrowArray, int64_t value);
+ static Status AppendField(ArrowArray* arrowArray, uint64_t value);
+ static Status AppendField(ArrowArray* arrowArray, double value);
+ static Status AppendField(ArrowArray* arrowArray, std::string_view value);
+ static Status AppendField(ArrowArray* arrowArray,
+ const std::span<const uint8_t>& value);
+
+ protected:
+ std::shared_ptr<ArrowArray> array_;
+ ArrowSchema schema_; // converted from manifest_schema_ or
manifest_list_schema_
int64_t size_ = 0;
};
// \brief Implemented by different versions with different schemas to
// append a list of `ManifestEntry`s to an `ArrowArray`.
class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
public:
- ManifestEntryAdapter() = default;
- ~ManifestEntryAdapter() override = default;
+ explicit ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec)
+ : partition_spec_(std::move(partition_spec)) {}
+ ~ManifestEntryAdapter() override;
virtual Status Append(const ManifestEntry& entry) = 0;
+
+ const std::shared_ptr<Schema>& schema() const { return manifest_schema_; }
+
+ protected:
+ virtual Result<std::shared_ptr<StructType>> GetManifestEntryStructType();
Review Comment:
```suggestion
virtual Result<std::shared_ptr<StructType>> GetManifestEntryType();
```
##########
src/iceberg/manifest_adapter.h:
##########
@@ -33,34 +41,107 @@ class ICEBERG_EXPORT ManifestAdapter {
public:
ManifestAdapter() = default;
virtual ~ManifestAdapter() = default;
+ virtual Status Init() = 0;
- virtual Status StartAppending() = 0;
- virtual Result<ArrowArray> FinishAppending() = 0;
+ Status StartAppending();
+ Result<std::shared_ptr<ArrowArray>> FinishAppending();
int64_t size() const { return size_; }
protected:
- ArrowArray array_;
+ static Status AppendField(ArrowArray* arrowArray, int64_t value);
+ static Status AppendField(ArrowArray* arrowArray, uint64_t value);
+ static Status AppendField(ArrowArray* arrowArray, double value);
+ static Status AppendField(ArrowArray* arrowArray, std::string_view value);
+ static Status AppendField(ArrowArray* arrowArray,
+ const std::span<const uint8_t>& value);
+
+ protected:
+ std::shared_ptr<ArrowArray> array_;
Review Comment:
It is not recommended to use smart pointers on a C struct, especially when
we don't add a custom deleter. Can we revert this line?
##########
src/iceberg/manifest_adapter.h:
##########
@@ -33,34 +41,107 @@ class ICEBERG_EXPORT ManifestAdapter {
public:
ManifestAdapter() = default;
virtual ~ManifestAdapter() = default;
+ virtual Status Init() = 0;
- virtual Status StartAppending() = 0;
- virtual Result<ArrowArray> FinishAppending() = 0;
+ Status StartAppending();
+ Result<std::shared_ptr<ArrowArray>> FinishAppending();
int64_t size() const { return size_; }
protected:
- ArrowArray array_;
+ static Status AppendField(ArrowArray* arrowArray, int64_t value);
+ static Status AppendField(ArrowArray* arrowArray, uint64_t value);
+ static Status AppendField(ArrowArray* arrowArray, double value);
+ static Status AppendField(ArrowArray* arrowArray, std::string_view value);
+ static Status AppendField(ArrowArray* arrowArray,
+ const std::span<const uint8_t>& value);
+
+ protected:
+ std::shared_ptr<ArrowArray> array_;
+ ArrowSchema schema_; // converted from manifest_schema_ or
manifest_list_schema_
int64_t size_ = 0;
};
// \brief Implemented by different versions with different schemas to
// append a list of `ManifestEntry`s to an `ArrowArray`.
class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
public:
- ManifestEntryAdapter() = default;
- ~ManifestEntryAdapter() override = default;
+ explicit ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec)
+ : partition_spec_(std::move(partition_spec)) {}
+ ~ManifestEntryAdapter() override;
virtual Status Append(const ManifestEntry& entry) = 0;
+
+ const std::shared_ptr<Schema>& schema() const { return manifest_schema_; }
+
+ protected:
+ virtual Result<std::shared_ptr<StructType>> GetManifestEntryStructType();
+
+ /// \brief Init version-specific schema for each version.
+ ///
+ /// \param fields_ids each version of manifest schema has schema, we will
init this
+ /// schema based on the fields_ids.
+ Status InitSchema(const std::unordered_set<int32_t>& fields_ids);
+ Status AppendInternal(const ManifestEntry& entry);
+ Status AppendDataFile(ArrowArray* arrow_array,
+ const std::shared_ptr<StructType>& data_file_type,
+ const std::shared_ptr<DataFile>& file);
+ static Status AppendPartition(ArrowArray* arrow_array,
+ const std::shared_ptr<StructType>&
partition_type,
+ const std::vector<Literal>& partitions);
Review Comment:
```suggestion
static Status AppendPartition(ArrowArray* arrow_array,
const StructType& partition_type,
const std::vector<Literal>& partition_value);
```
##########
src/iceberg/manifest_adapter.cc:
##########
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest_adapter.h"
+
+#include "iceberg/arrow/nanoarrow_error_transform_internal.h"
+#include "iceberg/manifest_entry.h"
+#include "iceberg/manifest_list.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+#include "nanoarrow/nanoarrow.h"
+
+#define NANOARROW_RETURN_IF_FAILED(status) \
+ if (status != NANOARROW_OK) [[unlikely]] { \
+ return InvalidArrowData("Nanoarrow error code: {}", status); \
+ }
Review Comment:
```suggestion
```
We should use `ICEBERG_NANOARROW_RETURN_IF_NOT_OK` consistently.
##########
src/iceberg/manifest_adapter.cc:
##########
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest_adapter.h"
+
+#include "iceberg/arrow/nanoarrow_error_transform_internal.h"
+#include "iceberg/manifest_entry.h"
+#include "iceberg/manifest_list.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+#include "nanoarrow/nanoarrow.h"
+
+#define NANOARROW_RETURN_IF_FAILED(status) \
+ if (status != NANOARROW_OK) [[unlikely]] { \
+ return InvalidArrowData("Nanoarrow error code: {}", status); \
+ }
+
+namespace {
+static constexpr int64_t kBlockSizeInBytesV1 = 64 * 1024 * 1024L;
+}
+
+namespace iceberg {
+
+Status ManifestAdapter::StartAppending() {
+ if (size_ > 0) {
+ return InvalidArgument("Adapter buffer not empty, cannot start
appending.");
+ }
+ array_ = std::make_shared<ArrowArray>();
+ size_ = 0;
+ ArrowError error;
+ ICEBERG_NANOARROW_RETURN_IF_NOT_OK(
+ ArrowArrayInitFromSchema(array_.get(), &schema_, &error), error);
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayStartAppending(array_.get()));
Review Comment:
```suggestion
ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayStartAppending(array_.get()));
```
##########
src/iceberg/manifest_adapter.h:
##########
@@ -33,34 +41,107 @@ class ICEBERG_EXPORT ManifestAdapter {
public:
ManifestAdapter() = default;
virtual ~ManifestAdapter() = default;
+ virtual Status Init() = 0;
- virtual Status StartAppending() = 0;
- virtual Result<ArrowArray> FinishAppending() = 0;
+ Status StartAppending();
+ Result<std::shared_ptr<ArrowArray>> FinishAppending();
Review Comment:
As commented below, I don't think `std::shared_ptr<ArrowArray>` is a good
idea.
##########
src/iceberg/manifest_adapter.h:
##########
@@ -33,34 +41,107 @@ class ICEBERG_EXPORT ManifestAdapter {
public:
ManifestAdapter() = default;
virtual ~ManifestAdapter() = default;
+ virtual Status Init() = 0;
- virtual Status StartAppending() = 0;
- virtual Result<ArrowArray> FinishAppending() = 0;
+ Status StartAppending();
+ Result<std::shared_ptr<ArrowArray>> FinishAppending();
int64_t size() const { return size_; }
protected:
- ArrowArray array_;
+ static Status AppendField(ArrowArray* arrowArray, int64_t value);
+ static Status AppendField(ArrowArray* arrowArray, uint64_t value);
+ static Status AppendField(ArrowArray* arrowArray, double value);
+ static Status AppendField(ArrowArray* arrowArray, std::string_view value);
+ static Status AppendField(ArrowArray* arrowArray,
+ const std::span<const uint8_t>& value);
+
+ protected:
+ std::shared_ptr<ArrowArray> array_;
+ ArrowSchema schema_; // converted from manifest_schema_ or
manifest_list_schema_
int64_t size_ = 0;
};
// \brief Implemented by different versions with different schemas to
// append a list of `ManifestEntry`s to an `ArrowArray`.
class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
public:
- ManifestEntryAdapter() = default;
- ~ManifestEntryAdapter() override = default;
+ explicit ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec)
+ : partition_spec_(std::move(partition_spec)) {}
+ ~ManifestEntryAdapter() override;
virtual Status Append(const ManifestEntry& entry) = 0;
+
+ const std::shared_ptr<Schema>& schema() const { return manifest_schema_; }
+
+ protected:
+ virtual Result<std::shared_ptr<StructType>> GetManifestEntryStructType();
+
+ /// \brief Init version-specific schema for each version.
+ ///
+ /// \param fields_ids each version of manifest schema has schema, we will
init this
+ /// schema based on the fields_ids.
+ Status InitSchema(const std::unordered_set<int32_t>& fields_ids);
+ Status AppendInternal(const ManifestEntry& entry);
+ Status AppendDataFile(ArrowArray* arrow_array,
+ const std::shared_ptr<StructType>& data_file_type,
+ const std::shared_ptr<DataFile>& file);
+ static Status AppendPartition(ArrowArray* arrow_array,
+ const std::shared_ptr<StructType>&
partition_type,
+ const std::vector<Literal>& partitions);
+ static Status AppendList(ArrowArray* arrow_array,
+ const std::vector<int32_t>& list_value);
+ static Status AppendList(ArrowArray* arrow_array,
+ const std::vector<int64_t>& list_value);
+ static Status AppendMap(ArrowArray* arrow_array,
+ const std::map<int32_t, int64_t>& map_value);
+ static Status AppendMap(ArrowArray* arrow_array,
+ const std::map<int32_t, std::vector<uint8_t>>&
map_value);
+
+ virtual Result<std::optional<int64_t>> GetSequenceNumber(const
ManifestEntry& entry);
+ virtual Result<std::optional<std::string>> GetWrappedReferenceDataFile(
+ const std::shared_ptr<DataFile>& file);
+ virtual Result<std::optional<int64_t>> GetWrappedFirstRowId(
+ const std::shared_ptr<DataFile>& file);
+ virtual Result<std::optional<int64_t>> GetWrappedContentOffset(
+ const std::shared_ptr<DataFile>& file);
+ virtual Result<std::optional<int64_t>> GetWrappedContentSizeInBytes(
+ const std::shared_ptr<DataFile>& file);
Review Comment:
```suggestion
virtual Result<std::optional<std::string>> GetWrappedReferenceDataFile(
const DataFile& file);
virtual Result<std::optional<int64_t>> GetWrappedFirstRowId(
const DataFile& file);
virtual Result<std::optional<int64_t>> GetWrappedContentOffset(
const DataFile& file);
virtual Result<std::optional<int64_t>> GetWrappedContentSizeInBytes(
const DataFile& file);
```
Should we also remove `Wrapped` from these functions?
##########
src/iceberg/manifest_adapter.cc:
##########
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest_adapter.h"
+
+#include "iceberg/arrow/nanoarrow_error_transform_internal.h"
+#include "iceberg/manifest_entry.h"
+#include "iceberg/manifest_list.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+#include "nanoarrow/nanoarrow.h"
+
+#define NANOARROW_RETURN_IF_FAILED(status) \
+ if (status != NANOARROW_OK) [[unlikely]] { \
+ return InvalidArrowData("Nanoarrow error code: {}", status); \
+ }
+
+namespace {
+static constexpr int64_t kBlockSizeInBytesV1 = 64 * 1024 * 1024L;
+}
+
+namespace iceberg {
+
+Status ManifestAdapter::StartAppending() {
+ if (size_ > 0) {
+ return InvalidArgument("Adapter buffer not empty, cannot start
appending.");
+ }
+ array_ = std::make_shared<ArrowArray>();
+ size_ = 0;
+ ArrowError error;
+ ICEBERG_NANOARROW_RETURN_IF_NOT_OK(
+ ArrowArrayInitFromSchema(array_.get(), &schema_, &error), error);
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayStartAppending(array_.get()));
+ return {};
+}
+
+Result<std::shared_ptr<ArrowArray>> ManifestAdapter::FinishAppending() {
+ ArrowError error;
+ ICEBERG_NANOARROW_RETURN_IF_NOT_OK(
+ ArrowArrayFinishBuildingDefault(array_.get(), &error), error);
+ return array_;
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, int64_t value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendInt(arrowArray, value));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, uint64_t value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendUInt(arrowArray, value));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, double value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendDouble(arrowArray, value));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, std::string_view
value) {
+ ArrowStringView view(value.data(), value.size());
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendString(arrowArray, view));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray,
+ const std::span<const uint8_t>& value) {
+ ArrowBufferViewData data;
+ data.as_char = reinterpret_cast<const char*>(value.data());
+ ArrowBufferView view(data, value.size());
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendBytes(arrowArray, view));
+ return {};
+}
+
+ManifestEntryAdapter::~ManifestEntryAdapter() {
+ if (array_ != nullptr && array_->release != nullptr) {
+ ArrowArrayRelease(array_.get());
+ }
+ if (schema_.release != nullptr) {
+ ArrowSchemaRelease(&schema_);
+ }
+}
+
+Result<std::shared_ptr<StructType>>
ManifestEntryAdapter::GetManifestEntryStructType() {
+ if (partition_spec_ == nullptr) {
+ return ManifestEntry::TypeFromPartitionType(nullptr);
+ }
+ ICEBERG_ASSIGN_OR_RAISE(auto partition_schema,
partition_spec_->partition_schema());
+ return ManifestEntry::TypeFromPartitionType(std::move(partition_schema));
+}
+
+Status ManifestEntryAdapter::AppendPartition(
+ ArrowArray* arrow_array, const std::shared_ptr<StructType>& partition_type,
+ const std::vector<Literal>& partitions) {
+ if (arrow_array->n_children != partition_type->fields().size()) [[unlikely]]
{
+ return InvalidManifest("Arrow array of partition does not match partition
type.");
+ }
+ if (partitions.size() != partition_type->fields().size()) [[unlikely]] {
+ return InvalidManifest("Literal list of partition does not match partition
type.");
+ }
+ auto fields = partition_type->fields();
+
+ for (int32_t i = 0; i < fields.size(); i++) {
+ const auto& partition = partitions[i];
+ const auto& field = fields[i];
+ auto array = arrow_array->children[i];
+ if (partition.IsNull()) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ continue;
+ }
+ switch (field.type()->type_id()) {
+ case TypeId::kBoolean:
+ ICEBERG_RETURN_UNEXPECTED(AppendField(
+ array,
+ static_cast<uint64_t>(std::get<bool>(partition.value()) == true ?
1L : 0L)));
Review Comment:
```suggestion
static_cast<uint64_t>(std::get<bool>(partition.value()) ? 1L :
0L)));
```
##########
src/iceberg/manifest_adapter.h:
##########
@@ -33,34 +41,107 @@ class ICEBERG_EXPORT ManifestAdapter {
public:
ManifestAdapter() = default;
virtual ~ManifestAdapter() = default;
+ virtual Status Init() = 0;
- virtual Status StartAppending() = 0;
- virtual Result<ArrowArray> FinishAppending() = 0;
+ Status StartAppending();
+ Result<std::shared_ptr<ArrowArray>> FinishAppending();
int64_t size() const { return size_; }
protected:
- ArrowArray array_;
+ static Status AppendField(ArrowArray* arrowArray, int64_t value);
+ static Status AppendField(ArrowArray* arrowArray, uint64_t value);
+ static Status AppendField(ArrowArray* arrowArray, double value);
+ static Status AppendField(ArrowArray* arrowArray, std::string_view value);
+ static Status AppendField(ArrowArray* arrowArray,
+ const std::span<const uint8_t>& value);
+
+ protected:
+ std::shared_ptr<ArrowArray> array_;
+ ArrowSchema schema_; // converted from manifest_schema_ or
manifest_list_schema_
int64_t size_ = 0;
};
// \brief Implemented by different versions with different schemas to
// append a list of `ManifestEntry`s to an `ArrowArray`.
class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
public:
- ManifestEntryAdapter() = default;
- ~ManifestEntryAdapter() override = default;
+ explicit ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec)
+ : partition_spec_(std::move(partition_spec)) {}
+ ~ManifestEntryAdapter() override;
virtual Status Append(const ManifestEntry& entry) = 0;
+
+ const std::shared_ptr<Schema>& schema() const { return manifest_schema_; }
+
+ protected:
+ virtual Result<std::shared_ptr<StructType>> GetManifestEntryStructType();
+
+ /// \brief Init version-specific schema for each version.
+ ///
+ /// \param fields_ids each version of manifest schema has schema, we will
init this
+ /// schema based on the fields_ids.
+ Status InitSchema(const std::unordered_set<int32_t>& fields_ids);
+ Status AppendInternal(const ManifestEntry& entry);
+ Status AppendDataFile(ArrowArray* arrow_array,
+ const std::shared_ptr<StructType>& data_file_type,
+ const std::shared_ptr<DataFile>& file);
+ static Status AppendPartition(ArrowArray* arrow_array,
+ const std::shared_ptr<StructType>&
partition_type,
+ const std::vector<Literal>& partitions);
+ static Status AppendList(ArrowArray* arrow_array,
+ const std::vector<int32_t>& list_value);
+ static Status AppendList(ArrowArray* arrow_array,
+ const std::vector<int64_t>& list_value);
+ static Status AppendMap(ArrowArray* arrow_array,
+ const std::map<int32_t, int64_t>& map_value);
+ static Status AppendMap(ArrowArray* arrow_array,
+ const std::map<int32_t, std::vector<uint8_t>>&
map_value);
+
+ virtual Result<std::optional<int64_t>> GetSequenceNumber(const
ManifestEntry& entry);
+ virtual Result<std::optional<std::string>> GetWrappedReferenceDataFile(
+ const std::shared_ptr<DataFile>& file);
+ virtual Result<std::optional<int64_t>> GetWrappedFirstRowId(
+ const std::shared_ptr<DataFile>& file);
+ virtual Result<std::optional<int64_t>> GetWrappedContentOffset(
+ const std::shared_ptr<DataFile>& file);
+ virtual Result<std::optional<int64_t>> GetWrappedContentSizeInBytes(
+ const std::shared_ptr<DataFile>& file);
+
+ protected:
+ std::shared_ptr<PartitionSpec> partition_spec_;
+ std::shared_ptr<Schema> manifest_schema_;
+ std::unordered_map<std::string, std::string> metadata_;
};
// \brief Implemented by different versions with different schemas to
// append a list of `ManifestFile`s to an `ArrowArray`.
class ICEBERG_EXPORT ManifestFileAdapter : public ManifestAdapter {
public:
ManifestFileAdapter() = default;
- ~ManifestFileAdapter() override = default;
+ ~ManifestFileAdapter() override;
virtual Status Append(const ManifestFile& file) = 0;
+
+ const std::shared_ptr<Schema>& schema() const { return
manifest_list_schema_; }
+
+ protected:
+ /// \brief Init version-specific schema for each version.
+ ///
+ /// \param fields_ids each version of manifest schema has schema, we will
init this
+ /// schema based on the fields_ids.
+ Status InitSchema(const std::unordered_set<int32_t>& fields_ids);
+ Status AppendInternal(const ManifestFile& file);
+ static Status AppendPartitions(ArrowArray* arrow_array,
+ const std::shared_ptr<ListType>&
partition_type,
+ const std::vector<PartitionFieldSummary>&
partitions);
Review Comment:
```suggestion
static Status AppendPartitionSummary(ArrowArray* arrow_array,
const ListType&
partition_summary_type,
const
std::vector<PartitionFieldSummary>& summaries);
```
##########
src/iceberg/partition_spec.cc:
##########
@@ -57,6 +60,49 @@ int32_t PartitionSpec::spec_id() const { return spec_id_; }
std::span<const PartitionField> PartitionSpec::fields() const { return
fields_; }
+Result<std::shared_ptr<Schema>> PartitionSpec::partition_schema() {
+ if (fields_.empty()) {
+ return nullptr;
+ }
+ {
+ std::scoped_lock<std::mutex> lock(mutex_);
+ if (partition_schema_ != nullptr) {
+ return partition_schema_;
+ }
+ }
+
+ std::vector<SchemaField> partition_fields;
+ for (const auto& partition_field : fields_) {
+ // Get the source field from the original schema by source_id
+ ICEBERG_ASSIGN_OR_RAISE(auto source_field,
+
schema_->FindFieldById(partition_field.source_id()));
+ if (!source_field.has_value()) {
+ return InvalidSchema("Cannot find source field for partition field:{}",
+ partition_field.field_id());
+ }
+ auto source_field_type = source_field.value().get().type();
+ // Bind the transform to the source field type to get the result type
+ ICEBERG_ASSIGN_OR_RAISE(auto transform_function,
+
partition_field.transform()->Bind(source_field_type));
+
+ auto result_type = transform_function->ResultType();
+
+ // Create the partition field with the transform result type
+ // Partition fields are always optional (can be null)
+ partition_fields.emplace_back(partition_field.field_id(),
+ std::string(partition_field.name()),
+ std::move(result_type),
+ true // optional
Review Comment:
```suggestion
/*optional=*/true
```
##########
src/iceberg/partition_spec.cc:
##########
@@ -57,6 +60,49 @@ int32_t PartitionSpec::spec_id() const { return spec_id_; }
std::span<const PartitionField> PartitionSpec::fields() const { return
fields_; }
+Result<std::shared_ptr<Schema>> PartitionSpec::partition_schema() {
+ if (fields_.empty()) {
+ return nullptr;
+ }
+ {
+ std::scoped_lock<std::mutex> lock(mutex_);
Review Comment:
Can we use `std::once_flag` and `std::call_once` instead?
##########
src/iceberg/partition_spec.cc:
##########
@@ -57,6 +60,49 @@ int32_t PartitionSpec::spec_id() const { return spec_id_; }
std::span<const PartitionField> PartitionSpec::fields() const { return
fields_; }
+Result<std::shared_ptr<Schema>> PartitionSpec::partition_schema() {
+ if (fields_.empty()) {
+ return nullptr;
+ }
+ {
+ std::scoped_lock<std::mutex> lock(mutex_);
+ if (partition_schema_ != nullptr) {
+ return partition_schema_;
+ }
+ }
+
+ std::vector<SchemaField> partition_fields;
+ for (const auto& partition_field : fields_) {
+ // Get the source field from the original schema by source_id
+ ICEBERG_ASSIGN_OR_RAISE(auto source_field,
+
schema_->FindFieldById(partition_field.source_id()));
+ if (!source_field.has_value()) {
+ return InvalidSchema("Cannot find source field for partition field:{}",
Review Comment:
Add a TODO comment to use unknown type when source field is missing.
##########
src/iceberg/v2_metadata.cc:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/v2_metadata.h"
+
+#include "iceberg/manifest_entry.h"
+#include "iceberg/manifest_list.h"
+#include "iceberg/schema.h"
+
+namespace iceberg {
+
+Status ManifestEntryAdapterV2::Init() {
+ static std::unordered_set<int32_t> kManifestEntryFieldIds{
+ ManifestEntry::kStatus.field_id(),
+ ManifestEntry::kSnapshotId.field_id(),
+ ManifestEntry::kDataFileFieldId,
+ ManifestEntry::kSequenceNumber.field_id(),
+ ManifestEntry::kFileSequenceNumber.field_id(),
Review Comment:
```suggestion
ManifestEntry::kSequenceNumber.field_id(),
ManifestEntry::kFileSequenceNumber.field_id(),
ManifestEntry::kDataFileFieldId,
```
Reorder them to match the actual schema.
##########
src/iceberg/manifest_adapter.cc:
##########
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest_adapter.h"
+
+#include "iceberg/arrow/nanoarrow_error_transform_internal.h"
+#include "iceberg/manifest_entry.h"
+#include "iceberg/manifest_list.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+#include "nanoarrow/nanoarrow.h"
+
+#define NANOARROW_RETURN_IF_FAILED(status) \
+ if (status != NANOARROW_OK) [[unlikely]] { \
+ return InvalidArrowData("Nanoarrow error code: {}", status); \
+ }
+
+namespace {
+static constexpr int64_t kBlockSizeInBytesV1 = 64 * 1024 * 1024L;
+}
+
+namespace iceberg {
+
+Status ManifestAdapter::StartAppending() {
+ if (size_ > 0) {
+ return InvalidArgument("Adapter buffer not empty, cannot start
appending.");
+ }
+ array_ = std::make_shared<ArrowArray>();
+ size_ = 0;
+ ArrowError error;
+ ICEBERG_NANOARROW_RETURN_IF_NOT_OK(
+ ArrowArrayInitFromSchema(array_.get(), &schema_, &error), error);
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayStartAppending(array_.get()));
+ return {};
+}
+
+Result<std::shared_ptr<ArrowArray>> ManifestAdapter::FinishAppending() {
+ ArrowError error;
+ ICEBERG_NANOARROW_RETURN_IF_NOT_OK(
+ ArrowArrayFinishBuildingDefault(array_.get(), &error), error);
+ return array_;
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, int64_t value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendInt(arrowArray, value));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, uint64_t value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendUInt(arrowArray, value));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, double value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendDouble(arrowArray, value));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, std::string_view
value) {
+ ArrowStringView view(value.data(), value.size());
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendString(arrowArray, view));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray,
+ const std::span<const uint8_t>& value) {
+ ArrowBufferViewData data;
+ data.as_char = reinterpret_cast<const char*>(value.data());
+ ArrowBufferView view(data, value.size());
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendBytes(arrowArray, view));
+ return {};
+}
+
+ManifestEntryAdapter::~ManifestEntryAdapter() {
+ if (array_ != nullptr && array_->release != nullptr) {
+ ArrowArrayRelease(array_.get());
+ }
+ if (schema_.release != nullptr) {
+ ArrowSchemaRelease(&schema_);
+ }
+}
+
+Result<std::shared_ptr<StructType>>
ManifestEntryAdapter::GetManifestEntryStructType() {
+ if (partition_spec_ == nullptr) {
+ return ManifestEntry::TypeFromPartitionType(nullptr);
+ }
+ ICEBERG_ASSIGN_OR_RAISE(auto partition_schema,
partition_spec_->partition_schema());
+ return ManifestEntry::TypeFromPartitionType(std::move(partition_schema));
+}
+
+Status ManifestEntryAdapter::AppendPartition(
+ ArrowArray* arrow_array, const std::shared_ptr<StructType>& partition_type,
+ const std::vector<Literal>& partitions) {
+ if (arrow_array->n_children != partition_type->fields().size()) [[unlikely]]
{
+ return InvalidManifest("Arrow array of partition does not match partition
type.");
+ }
+ if (partitions.size() != partition_type->fields().size()) [[unlikely]] {
+ return InvalidManifest("Literal list of partition does not match partition
type.");
+ }
+ auto fields = partition_type->fields();
+
+ for (int32_t i = 0; i < fields.size(); i++) {
+ const auto& partition = partitions[i];
+ const auto& field = fields[i];
+ auto array = arrow_array->children[i];
+ if (partition.IsNull()) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ continue;
+ }
+ switch (field.type()->type_id()) {
+ case TypeId::kBoolean:
+ ICEBERG_RETURN_UNEXPECTED(AppendField(
+ array,
+ static_cast<uint64_t>(std::get<bool>(partition.value()) == true ?
1L : 0L)));
+ break;
+ case TypeId::kInt:
+ ICEBERG_RETURN_UNEXPECTED(AppendField(
+ array,
static_cast<int64_t>(std::get<int32_t>(partition.value()))));
+ break;
+ case TypeId::kLong:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, std::get<int64_t>(partition.value())));
+ break;
+ case TypeId::kFloat:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array,
static_cast<double>(std::get<float>(partition.value()))));
+ break;
+ case TypeId::kDouble:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, std::get<double>(partition.value())));
+ break;
+ case TypeId::kString:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, std::get<std::string>(partition.value())));
+ break;
+ case TypeId::kFixed:
+ case TypeId::kBinary:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array,
std::get<std::vector<uint8_t>>(partition.value())));
+ break;
+ case TypeId::kDate:
+ ICEBERG_RETURN_UNEXPECTED(AppendField(
+ array,
static_cast<int64_t>(std::get<int32_t>(partition.value()))));
+ break;
+ case TypeId::kTime:
+ case TypeId::kTimestamp:
+ case TypeId::kTimestampTz:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, std::get<int64_t>(partition.value())));
+ break;
+ case TypeId::kDecimal:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, std::get<std::array<uint8_t,
16>>(partition.value())));
+ break;
+ case TypeId::kUuid:
+ case TypeId::kStruct:
+ case TypeId::kList:
+ case TypeId::kMap:
+ // TODO(xiao.dong) currently literal does not support those types
+ default:
+ return InvalidManifest("Unsupported partition type: {}",
field.ToString());
+ }
+ }
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
+ return {};
+}
+
+Status ManifestEntryAdapter::AppendList(ArrowArray* arrow_array,
+ const std::vector<int32_t>&
list_value) {
+ auto list_array = arrow_array->children[0];
+ for (const auto& value : list_value) {
+ NANOARROW_RETURN_IF_FAILED(
+ ArrowArrayAppendInt(list_array, static_cast<int64_t>(value)));
+ }
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
+ return {};
+}
+
+Status ManifestEntryAdapter::AppendList(ArrowArray* arrow_array,
+ const std::vector<int64_t>&
list_value) {
+ auto list_array = arrow_array->children[0];
+ for (const auto& value : list_value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendInt(list_array, value));
+ }
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
+ return {};
+}
+
+Status ManifestEntryAdapter::AppendMap(ArrowArray* arrow_array,
+ const std::map<int32_t, int64_t>&
map_value) {
+ auto map_array = arrow_array->children[0];
+ if (map_array->n_children != 2) {
+ return InvalidManifest("Invalid map array.");
+ }
+ for (const auto& [key, value] : map_value) {
+ auto key_array = map_array->children[0];
+ auto value_array = map_array->children[1];
+ ICEBERG_RETURN_UNEXPECTED(AppendField(key_array,
static_cast<int64_t>(key)));
+ ICEBERG_RETURN_UNEXPECTED(AppendField(value_array, value));
+ }
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
+ return {};
+}
+
+Status ManifestEntryAdapter::AppendMap(
+ ArrowArray* arrow_array, const std::map<int32_t, std::vector<uint8_t>>&
map_value) {
+ auto map_array = arrow_array->children[0];
+ if (map_array->n_children != 2) {
+ return InvalidManifest("Invalid map array.");
+ }
+ for (const auto& [key, value] : map_value) {
+ auto key_array = map_array->children[0];
+ auto value_array = map_array->children[1];
+ ICEBERG_RETURN_UNEXPECTED(AppendField(key_array,
static_cast<int64_t>(key)));
+ ICEBERG_RETURN_UNEXPECTED(AppendField(value_array, value));
+ }
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
+ return {};
+}
+
+Status ManifestEntryAdapter::AppendDataFile(
+ ArrowArray* arrow_array, const std::shared_ptr<StructType>& data_file_type,
+ const std::shared_ptr<DataFile>& file) {
+ auto fields = data_file_type->fields();
+ for (int32_t i = 0; i < fields.size(); i++) {
+ const auto& field = fields[i];
+ auto array = arrow_array->children[i];
+
+ switch (field.field_id()) {
+ case 134: // content (optional int32)
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, static_cast<int64_t>(file->content)));
+ break;
+ case 100: // file_path (required string)
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array, file->file_path));
+ break;
+ case 101: // file_format (required string)
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
ToString(file->file_format)));
+ break;
+ case 102: // partition (required struct)
+ {
+ auto partition_type =
internal::checked_pointer_cast<StructType>(field.type());
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendPartition(array, partition_type, file->partition));
+ } break;
+ case 103: // record_count (required int64)
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array, file->record_count));
+ break;
+ case 104: // file_size_in_bytes (required int64)
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
file->file_size_in_bytes));
+ break;
+ case 105: // block_size_in_bytes (compatible in v1)
+ // always 64MB for v1
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array, kBlockSizeInBytesV1));
+ break;
+ case 108: // column_sizes (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file->column_sizes));
+ break;
+ case 109: // value_counts (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file->value_counts));
+ break;
+ case 110: // null_value_counts (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file->null_value_counts));
+ break;
+ case 137: // nan_value_counts (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file->nan_value_counts));
+ break;
+ case 125: // lower_bounds (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file->lower_bounds));
+ break;
+ case 128: // upper_bounds (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file->upper_bounds));
+ break;
+ case 131: // key_metadata (optional binary)
+ if (!file->key_metadata.empty()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array, file->key_metadata));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 132: // split_offsets (optional list)
+ ICEBERG_RETURN_UNEXPECTED(AppendList(array, file->split_offsets));
+ break;
+ case 135: // equality_ids (optional list)
+ ICEBERG_RETURN_UNEXPECTED(AppendList(array, file->equality_ids));
+ break;
+ case 140: // sort_order_id (optional int32)
+ if (file->sort_order_id.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array,
static_cast<int64_t>(file->sort_order_id.value())));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 142: // first_row_id (optional int64)
+ if (file->first_row_id.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
file->first_row_id.value()));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 143: // referenced_data_file (optional string)
+ {
+ ICEBERG_ASSIGN_OR_RAISE(auto referenced_data_file,
+ GetWrappedReferenceDataFile(file));
+ if (referenced_data_file.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
referenced_data_file.value()));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ }
+ case 144: // content_offset (optional int64)
+ if (file->content_offset.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
file->content_offset.value()));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 145: // content_size_in_bytes (optional int64)
+ if (file->content_size_in_bytes.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, file->content_size_in_bytes.value()));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ default:
+ return InvalidManifest("Unknown data file field id: {} ",
field.field_id());
+ }
+ }
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
+ return {};
+}
+
+Result<std::optional<int64_t>> ManifestEntryAdapter::GetSequenceNumber(
+ const ManifestEntry& entry) {
+ return entry.sequence_number;
+}
+
+Result<std::optional<std::string>>
ManifestEntryAdapter::GetWrappedReferenceDataFile(
+ const std::shared_ptr<DataFile>& file) {
+ return file->referenced_data_file;
+}
+
+Result<std::optional<int64_t>> ManifestEntryAdapter::GetWrappedFirstRowId(
+ const std::shared_ptr<DataFile>& file) {
+ return file->first_row_id;
+}
+
+Result<std::optional<int64_t>> ManifestEntryAdapter::GetWrappedContentOffset(
+ const std::shared_ptr<DataFile>& file) {
+ return file->content_offset;
+}
+
+Result<std::optional<int64_t>>
ManifestEntryAdapter::GetWrappedContentSizeInBytes(
+ const std::shared_ptr<DataFile>& file) {
+ return file->content_size_in_bytes;
+}
+
+Status ManifestEntryAdapter::AppendInternal(const ManifestEntry& entry) {
+ const auto& fields = manifest_schema_->fields();
+ for (int32_t i = 0; i < fields.size(); i++) {
+ const auto& field = fields[i];
+ auto array = array_->children[i];
+
+ switch (field.field_id()) {
+ case 0: // status (required int32)
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array,
static_cast<int64_t>(static_cast<int32_t>(entry.status))));
+ break;
+ case 1: // snapshot_id (optional int64)
+ if (entry.snapshot_id.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
entry.snapshot_id.value()));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 2: // data_file (required struct)
+ if (entry.data_file) {
+ // Get the data file type from the field
+ auto data_file_type =
internal::checked_pointer_cast<StructType>(field.type());
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendDataFile(array, data_file_type, entry.data_file));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
Review Comment:
We should return error since data_file is required.
##########
src/iceberg/manifest_adapter.cc:
##########
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest_adapter.h"
+
+#include "iceberg/arrow/nanoarrow_error_transform_internal.h"
+#include "iceberg/manifest_entry.h"
+#include "iceberg/manifest_list.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+#include "nanoarrow/nanoarrow.h"
+
+#define NANOARROW_RETURN_IF_FAILED(status) \
+ if (status != NANOARROW_OK) [[unlikely]] { \
+ return InvalidArrowData("Nanoarrow error code: {}", status); \
+ }
+
+namespace {
+static constexpr int64_t kBlockSizeInBytesV1 = 64 * 1024 * 1024L;
+}
+
+namespace iceberg {
+
+Status ManifestAdapter::StartAppending() {
+ if (size_ > 0) {
+ return InvalidArgument("Adapter buffer not empty, cannot start
appending.");
+ }
+ array_ = std::make_shared<ArrowArray>();
+ size_ = 0;
+ ArrowError error;
+ ICEBERG_NANOARROW_RETURN_IF_NOT_OK(
+ ArrowArrayInitFromSchema(array_.get(), &schema_, &error), error);
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayStartAppending(array_.get()));
+ return {};
+}
+
+Result<std::shared_ptr<ArrowArray>> ManifestAdapter::FinishAppending() {
+ ArrowError error;
+ ICEBERG_NANOARROW_RETURN_IF_NOT_OK(
+ ArrowArrayFinishBuildingDefault(array_.get(), &error), error);
+ return array_;
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, int64_t value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendInt(arrowArray, value));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, uint64_t value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendUInt(arrowArray, value));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, double value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendDouble(arrowArray, value));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray, std::string_view
value) {
+ ArrowStringView view(value.data(), value.size());
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendString(arrowArray, view));
+ return {};
+}
+
+Status ManifestAdapter::AppendField(ArrowArray* arrowArray,
+ const std::span<const uint8_t>& value) {
+ ArrowBufferViewData data;
+ data.as_char = reinterpret_cast<const char*>(value.data());
+ ArrowBufferView view(data, value.size());
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendBytes(arrowArray, view));
+ return {};
+}
+
+ManifestEntryAdapter::~ManifestEntryAdapter() {
+ if (array_ != nullptr && array_->release != nullptr) {
+ ArrowArrayRelease(array_.get());
+ }
+ if (schema_.release != nullptr) {
+ ArrowSchemaRelease(&schema_);
+ }
+}
+
+Result<std::shared_ptr<StructType>>
ManifestEntryAdapter::GetManifestEntryStructType() {
+ if (partition_spec_ == nullptr) {
+ return ManifestEntry::TypeFromPartitionType(nullptr);
+ }
+ ICEBERG_ASSIGN_OR_RAISE(auto partition_schema,
partition_spec_->partition_schema());
+ return ManifestEntry::TypeFromPartitionType(std::move(partition_schema));
+}
+
+Status ManifestEntryAdapter::AppendPartition(
+ ArrowArray* arrow_array, const std::shared_ptr<StructType>& partition_type,
+ const std::vector<Literal>& partitions) {
+ if (arrow_array->n_children != partition_type->fields().size()) [[unlikely]]
{
+ return InvalidManifest("Arrow array of partition does not match partition
type.");
+ }
+ if (partitions.size() != partition_type->fields().size()) [[unlikely]] {
+ return InvalidManifest("Literal list of partition does not match partition
type.");
+ }
+ auto fields = partition_type->fields();
+
+ for (int32_t i = 0; i < fields.size(); i++) {
+ const auto& partition = partitions[i];
+ const auto& field = fields[i];
+ auto array = arrow_array->children[i];
+ if (partition.IsNull()) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ continue;
+ }
+ switch (field.type()->type_id()) {
+ case TypeId::kBoolean:
+ ICEBERG_RETURN_UNEXPECTED(AppendField(
+ array,
+ static_cast<uint64_t>(std::get<bool>(partition.value()) == true ?
1L : 0L)));
+ break;
+ case TypeId::kInt:
+ ICEBERG_RETURN_UNEXPECTED(AppendField(
+ array,
static_cast<int64_t>(std::get<int32_t>(partition.value()))));
+ break;
+ case TypeId::kLong:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, std::get<int64_t>(partition.value())));
+ break;
+ case TypeId::kFloat:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array,
static_cast<double>(std::get<float>(partition.value()))));
+ break;
+ case TypeId::kDouble:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, std::get<double>(partition.value())));
+ break;
+ case TypeId::kString:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, std::get<std::string>(partition.value())));
+ break;
+ case TypeId::kFixed:
+ case TypeId::kBinary:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array,
std::get<std::vector<uint8_t>>(partition.value())));
+ break;
+ case TypeId::kDate:
+ ICEBERG_RETURN_UNEXPECTED(AppendField(
+ array,
static_cast<int64_t>(std::get<int32_t>(partition.value()))));
+ break;
+ case TypeId::kTime:
+ case TypeId::kTimestamp:
+ case TypeId::kTimestampTz:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, std::get<int64_t>(partition.value())));
+ break;
+ case TypeId::kDecimal:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, std::get<std::array<uint8_t,
16>>(partition.value())));
+ break;
+ case TypeId::kUuid:
+ case TypeId::kStruct:
+ case TypeId::kList:
+ case TypeId::kMap:
+ // TODO(xiao.dong) currently literal does not support those types
+ default:
+ return InvalidManifest("Unsupported partition type: {}",
field.ToString());
+ }
+ }
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
+ return {};
+}
+
+Status ManifestEntryAdapter::AppendList(ArrowArray* arrow_array,
+ const std::vector<int32_t>&
list_value) {
+ auto list_array = arrow_array->children[0];
+ for (const auto& value : list_value) {
+ NANOARROW_RETURN_IF_FAILED(
+ ArrowArrayAppendInt(list_array, static_cast<int64_t>(value)));
+ }
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
+ return {};
+}
+
+Status ManifestEntryAdapter::AppendList(ArrowArray* arrow_array,
+ const std::vector<int64_t>&
list_value) {
+ auto list_array = arrow_array->children[0];
+ for (const auto& value : list_value) {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendInt(list_array, value));
+ }
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
+ return {};
+}
+
+Status ManifestEntryAdapter::AppendMap(ArrowArray* arrow_array,
+ const std::map<int32_t, int64_t>&
map_value) {
+ auto map_array = arrow_array->children[0];
+ if (map_array->n_children != 2) {
+ return InvalidManifest("Invalid map array.");
+ }
+ for (const auto& [key, value] : map_value) {
+ auto key_array = map_array->children[0];
+ auto value_array = map_array->children[1];
+ ICEBERG_RETURN_UNEXPECTED(AppendField(key_array,
static_cast<int64_t>(key)));
+ ICEBERG_RETURN_UNEXPECTED(AppendField(value_array, value));
+ }
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
+ return {};
+}
+
+Status ManifestEntryAdapter::AppendMap(
+ ArrowArray* arrow_array, const std::map<int32_t, std::vector<uint8_t>>&
map_value) {
+ auto map_array = arrow_array->children[0];
+ if (map_array->n_children != 2) {
+ return InvalidManifest("Invalid map array.");
+ }
+ for (const auto& [key, value] : map_value) {
+ auto key_array = map_array->children[0];
+ auto value_array = map_array->children[1];
+ ICEBERG_RETURN_UNEXPECTED(AppendField(key_array,
static_cast<int64_t>(key)));
+ ICEBERG_RETURN_UNEXPECTED(AppendField(value_array, value));
+ }
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
+ return {};
+}
+
+Status ManifestEntryAdapter::AppendDataFile(
+ ArrowArray* arrow_array, const std::shared_ptr<StructType>& data_file_type,
+ const std::shared_ptr<DataFile>& file) {
+ auto fields = data_file_type->fields();
+ for (int32_t i = 0; i < fields.size(); i++) {
+ const auto& field = fields[i];
+ auto array = arrow_array->children[i];
+
+ switch (field.field_id()) {
+ case 134: // content (optional int32)
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, static_cast<int64_t>(file->content)));
+ break;
+ case 100: // file_path (required string)
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array, file->file_path));
+ break;
+ case 101: // file_format (required string)
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
ToString(file->file_format)));
+ break;
+ case 102: // partition (required struct)
+ {
+ auto partition_type =
internal::checked_pointer_cast<StructType>(field.type());
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendPartition(array, partition_type, file->partition));
+ } break;
+ case 103: // record_count (required int64)
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array, file->record_count));
+ break;
+ case 104: // file_size_in_bytes (required int64)
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
file->file_size_in_bytes));
+ break;
+ case 105: // block_size_in_bytes (compatible in v1)
+ // always 64MB for v1
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array, kBlockSizeInBytesV1));
+ break;
+ case 108: // column_sizes (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file->column_sizes));
+ break;
+ case 109: // value_counts (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file->value_counts));
+ break;
+ case 110: // null_value_counts (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file->null_value_counts));
+ break;
+ case 137: // nan_value_counts (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file->nan_value_counts));
+ break;
+ case 125: // lower_bounds (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file->lower_bounds));
+ break;
+ case 128: // upper_bounds (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(array, file->upper_bounds));
+ break;
+ case 131: // key_metadata (optional binary)
+ if (!file->key_metadata.empty()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array, file->key_metadata));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 132: // split_offsets (optional list)
+ ICEBERG_RETURN_UNEXPECTED(AppendList(array, file->split_offsets));
+ break;
+ case 135: // equality_ids (optional list)
+ ICEBERG_RETURN_UNEXPECTED(AppendList(array, file->equality_ids));
+ break;
+ case 140: // sort_order_id (optional int32)
+ if (file->sort_order_id.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array,
static_cast<int64_t>(file->sort_order_id.value())));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 142: // first_row_id (optional int64)
+ if (file->first_row_id.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
file->first_row_id.value()));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 143: // referenced_data_file (optional string)
+ {
+ ICEBERG_ASSIGN_OR_RAISE(auto referenced_data_file,
+ GetWrappedReferenceDataFile(file));
+ if (referenced_data_file.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
referenced_data_file.value()));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ }
+ case 144: // content_offset (optional int64)
+ if (file->content_offset.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
file->content_offset.value()));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 145: // content_size_in_bytes (optional int64)
+ if (file->content_size_in_bytes.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, file->content_size_in_bytes.value()));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ default:
+ return InvalidManifest("Unknown data file field id: {} ",
field.field_id());
+ }
+ }
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
+ return {};
+}
+
+Result<std::optional<int64_t>> ManifestEntryAdapter::GetSequenceNumber(
+ const ManifestEntry& entry) {
+ return entry.sequence_number;
+}
+
+Result<std::optional<std::string>>
ManifestEntryAdapter::GetWrappedReferenceDataFile(
+ const std::shared_ptr<DataFile>& file) {
+ return file->referenced_data_file;
+}
+
+Result<std::optional<int64_t>> ManifestEntryAdapter::GetWrappedFirstRowId(
+ const std::shared_ptr<DataFile>& file) {
+ return file->first_row_id;
+}
+
+Result<std::optional<int64_t>> ManifestEntryAdapter::GetWrappedContentOffset(
+ const std::shared_ptr<DataFile>& file) {
+ return file->content_offset;
+}
+
+Result<std::optional<int64_t>>
ManifestEntryAdapter::GetWrappedContentSizeInBytes(
+ const std::shared_ptr<DataFile>& file) {
+ return file->content_size_in_bytes;
+}
+
+Status ManifestEntryAdapter::AppendInternal(const ManifestEntry& entry) {
+ const auto& fields = manifest_schema_->fields();
+ for (int32_t i = 0; i < fields.size(); i++) {
+ const auto& field = fields[i];
+ auto array = array_->children[i];
+
+ switch (field.field_id()) {
+ case 0: // status (required int32)
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array,
static_cast<int64_t>(static_cast<int32_t>(entry.status))));
+ break;
+ case 1: // snapshot_id (optional int64)
+ if (entry.snapshot_id.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
entry.snapshot_id.value()));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 2: // data_file (required struct)
+ if (entry.data_file) {
+ // Get the data file type from the field
+ auto data_file_type =
internal::checked_pointer_cast<StructType>(field.type());
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendDataFile(array, data_file_type, entry.data_file));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 3: // sequence_number (optional int64)
+ {
+ ICEBERG_ASSIGN_OR_RAISE(auto sequence_num, GetSequenceNumber(entry));
+ if (sequence_num.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array, sequence_num.value()));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ }
+ case 4: // file_sequence_number (optional int64)
+ if (entry.file_sequence_number.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, entry.file_sequence_number.value()));
+ } else {
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ default:
+ return InvalidManifest("Unknown manifest entry field id: {}",
field.field_id());
+ }
+ }
+
+ NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(array_.get()));
+ size_++;
+ return {};
+}
+
+Status ManifestEntryAdapter::InitSchema(const std::unordered_set<int32_t>&
fields_ids) {
+ ICEBERG_ASSIGN_OR_RAISE(auto manifest_entry_schema,
GetManifestEntryStructType())
+ auto fields_span = manifest_entry_schema->fields();
+ std::vector<SchemaField> fields;
+ // TODO(xiao.dong) make this a common function to recursive handle
Review Comment:
```suggestion
// TODO(xiao.dong) make this a common function to recursively handle
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]