dongxiao1198 commented on code in PR #150:
URL: https://github.com/apache/iceberg-cpp/pull/150#discussion_r2230419010
##########
src/iceberg/manifest_reader_internal.cc:
##########
@@ -134,112 +227,336 @@ Result<std::vector<ManifestFile>>
ParseManifestListEntry(ArrowSchema* schema,
auto field_name = field.value().get().name();
bool required = !field.value().get().optional();
auto view_of_column = array_view.children[idx];
-
-#define PARSE_PRIMITIVE_FIELD(item, type)
\
- for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) {
\
- if (!ArrowArrayViewIsNull(view_of_column, row_idx)) {
\
- auto value = ArrowArrayViewGetIntUnsafe(view_of_column, row_idx);
\
- manifest_files[row_idx].item = static_cast<type>(value);
\
- } else if (required) {
\
- return InvalidManifestList("Field {} is required but null at row {}",
field_name, \
- row_idx);
\
- }
\
- }
-
switch (idx) {
case 0:
- for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) {
- if (!ArrowArrayViewIsNull(view_of_column, row_idx)) {
- auto value = ArrowArrayViewGetStringUnsafe(view_of_column,
row_idx);
- std::string path_str(value.data, value.size_bytes);
- manifest_files[row_idx].manifest_path = path_str;
- }
- }
+ PARSE_STRING_FIELD(manifest_files[row_idx].manifest_path,
view_of_column);
break;
case 1:
- PARSE_PRIMITIVE_FIELD(manifest_length, int64_t);
+ PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].manifest_length,
view_of_column,
+ int64_t);
break;
case 2:
- PARSE_PRIMITIVE_FIELD(partition_spec_id, int32_t);
+ PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].partition_spec_id,
view_of_column,
+ int32_t);
break;
case 3:
- for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) {
- if (!ArrowArrayViewIsNull(view_of_column, row_idx)) {
- auto value = ArrowArrayViewGetIntUnsafe(view_of_column, row_idx);
- manifest_files[row_idx].content =
static_cast<ManifestFile::Content>(value);
- }
- }
+ PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].content, view_of_column,
+ ManifestFile::Content);
break;
case 4:
- PARSE_PRIMITIVE_FIELD(sequence_number, int64_t);
+ PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].sequence_number,
view_of_column,
+ int64_t);
break;
case 5:
- PARSE_PRIMITIVE_FIELD(min_sequence_number, int64_t);
+ PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].min_sequence_number,
view_of_column,
+ int64_t);
break;
case 6:
- PARSE_PRIMITIVE_FIELD(added_snapshot_id, int64_t);
+ PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_snapshot_id,
view_of_column,
+ int64_t);
break;
case 7:
- PARSE_PRIMITIVE_FIELD(added_files_count, int32_t);
+ PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_files_count,
view_of_column,
+ int32_t);
break;
case 8:
- PARSE_PRIMITIVE_FIELD(existing_files_count, int32_t);
+ PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].existing_files_count,
+ view_of_column, int32_t);
break;
case 9:
- PARSE_PRIMITIVE_FIELD(deleted_files_count, int32_t);
+ PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].deleted_files_count,
view_of_column,
+ int32_t);
break;
case 10:
- PARSE_PRIMITIVE_FIELD(added_rows_count, int64_t);
+ PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_rows_count,
view_of_column,
+ int64_t);
break;
case 11:
- PARSE_PRIMITIVE_FIELD(existing_rows_count, int64_t);
+ PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].existing_rows_count,
view_of_column,
+ int64_t);
break;
case 12:
- PARSE_PRIMITIVE_FIELD(deleted_rows_count, int64_t);
+ PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].deleted_rows_count,
view_of_column,
+ int64_t);
break;
case 13:
ICEBERG_RETURN_UNEXPECTED(
ParsePartitionFieldSummaryList(view_of_column, manifest_files));
break;
case 14:
- for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) {
- if (!ArrowArrayViewIsNull(view_of_column, row_idx)) {
- auto buffer = ArrowArrayViewGetBytesUnsafe(view_of_column,
row_idx);
- manifest_files[row_idx].key_metadata = std::vector<uint8_t>(
- buffer.data.as_char, buffer.data.as_char + buffer.size_bytes);
+ PARSE_BINARY_FIELD(manifest_files[row_idx].key_metadata,
view_of_column);
+ break;
+ case 15:
+ PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].first_row_id,
view_of_column,
+ int64_t);
+ break;
+ default:
+ return InvalidManifestList("Unsupported field: {} in manifest file.",
field_name);
+ }
+ }
+ return manifest_files;
+}
+
+Status ParseLiteral(ArrowArrayView* view_of_partition, size_t row_idx,
+ std::vector<ManifestEntry>& manifest_entries) {
+ if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_BOOL) {
+ auto value = ArrowArrayViewGetUIntUnsafe(view_of_partition, row_idx);
+ manifest_entries[row_idx].data_file->partition.emplace_back(
+ Literal::Boolean(value != 0));
+ } else if (view_of_partition->storage_type ==
ArrowType::NANOARROW_TYPE_INT32) {
+ auto value = ArrowArrayViewGetIntUnsafe(view_of_partition, row_idx);
+
manifest_entries[row_idx].data_file->partition.emplace_back(Literal::Int(value));
+ } else if (view_of_partition->storage_type ==
ArrowType::NANOARROW_TYPE_INT64) {
+ auto value = ArrowArrayViewGetIntUnsafe(view_of_partition, row_idx);
+
manifest_entries[row_idx].data_file->partition.emplace_back(Literal::Long(value));
+ } else if (view_of_partition->storage_type ==
ArrowType::NANOARROW_TYPE_FLOAT) {
+ auto value = ArrowArrayViewGetDoubleUnsafe(view_of_partition, row_idx);
+
manifest_entries[row_idx].data_file->partition.emplace_back(Literal::Float(value));
+ } else if (view_of_partition->storage_type ==
ArrowType::NANOARROW_TYPE_DOUBLE) {
+ auto value = ArrowArrayViewGetDoubleUnsafe(view_of_partition, row_idx);
+
manifest_entries[row_idx].data_file->partition.emplace_back(Literal::Double(value));
+ } else if (view_of_partition->storage_type ==
ArrowType::NANOARROW_TYPE_STRING) {
+ auto value = ArrowArrayViewGetStringUnsafe(view_of_partition, row_idx);
+ manifest_entries[row_idx].data_file->partition.emplace_back(
+ Literal::String(std::string(value.data, value.size_bytes)));
+ } else if (view_of_partition->storage_type ==
ArrowType::NANOARROW_TYPE_BINARY) {
+ auto buffer = ArrowArrayViewGetBytesUnsafe(view_of_partition, row_idx);
+ manifest_entries[row_idx].data_file->partition.emplace_back(
+ Literal::Binary(std::vector<uint8_t>(buffer.data.as_char,
+ buffer.data.as_char +
buffer.size_bytes)));
+ } else {
+ return InvalidManifest("Unsupported field type: {} in data file
partition.",
+
static_cast<int32_t>(view_of_partition->storage_type));
+ }
+ return {};
+}
+
+Status ParseDataFile(const std::shared_ptr<StructType>& data_file_schema,
+ ArrowArrayView* view_of_column,
+ std::vector<ManifestEntry>& manifest_entries) {
+ if (view_of_column->storage_type != ArrowType::NANOARROW_TYPE_STRUCT) {
+ return InvalidManifest("DataFile field should be a struct.");
+ }
+ if (view_of_column->n_children != data_file_schema->fields().size()) {
+ return InvalidManifest("DataFile schema size:{} not match with ArrayArray
columns:{}",
+ data_file_schema->fields().size(),
view_of_column->n_children);
+ }
+ for (int64_t col_idx = 0; col_idx < view_of_column->n_children; ++col_idx) {
+ auto field_name =
data_file_schema->GetFieldByIndex(col_idx).value().get().name();
+ auto required =
!data_file_schema->GetFieldByIndex(col_idx).value().get().optional();
+ auto view_of_file_field = view_of_column->children[col_idx];
+ auto manifest_entry_count = view_of_file_field->length;
+
+ switch (col_idx) {
+ case 0:
+ PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->content,
+ view_of_file_field, DataFile::Content);
+ break;
+ case 1:
+ PARSE_STRING_FIELD(manifest_entries[row_idx].data_file->file_path,
+ view_of_file_field);
+ break;
+ case 2:
+ for (size_t row_idx = 0; row_idx < view_of_file_field->length;
row_idx++) {
+ if (!ArrowArrayViewIsNull(view_of_file_field, row_idx)) {
+ auto value = ArrowArrayViewGetStringUnsafe(view_of_file_field,
row_idx);
+ std::string_view path_str(value.data, value.size_bytes);
+
ICEBERG_ASSIGN_OR_RAISE(manifest_entries[row_idx].data_file->file_format,
+ FileFormatTypeFromString(path_str));
+ }
+ }
+ break;
+ case 3: {
+ if (view_of_file_field->storage_type !=
ArrowType::NANOARROW_TYPE_STRUCT) {
+ return InvalidManifest("Field:{} should be a list.", field_name);
+ }
+ auto view_of_partition = view_of_file_field->children[0];
+ for (size_t row_idx = 0; row_idx < view_of_partition->length;
row_idx++) {
+ if (ArrowArrayViewIsNull(view_of_partition, row_idx)) {
+ break;
}
+ ICEBERG_RETURN_UNEXPECTED(
+ ParseLiteral(view_of_partition, row_idx, manifest_entries));
}
+ } break;
+ case 4:
+
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->record_count,
+ view_of_file_field, int64_t);
+ break;
+ case 5:
+
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->file_size_in_bytes,
+ view_of_file_field, int64_t);
+ break;
+ case 6:
+ // key&value should have the same offset
+ // HACK(xiao.dong) workaround for arrow bug:
+ // ArrowArrayViewListChildOffset can not get the correct offset for map
+
PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->column_sizes,
+ manifest_entry_count, view_of_file_field);
+ break;
+ case 7:
+
PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->value_counts,
+ manifest_entry_count, view_of_file_field);
+ break;
+ case 8:
+
PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->null_value_counts,
+ manifest_entry_count, view_of_file_field);
+ break;
+ case 9:
+
PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->nan_value_counts,
+ manifest_entry_count, view_of_file_field);
+ break;
+ case 10:
+
PARSE_INT_BINARY_MAP_FIELD(manifest_entries[row_idx].data_file->lower_bounds,
+ manifest_entry_count, view_of_file_field);
+ break;
+ case 11:
+
PARSE_INT_BINARY_MAP_FIELD(manifest_entries[row_idx].data_file->upper_bounds,
+ manifest_entry_count, view_of_file_field);
+ break;
+ case 12:
+ PARSE_BINARY_FIELD(manifest_entries[row_idx].data_file->key_metadata,
+ view_of_file_field);
+ break;
+ case 13:
+ PARSE_INTEGER_VECTOR_FIELD(
+ manifest_entries[manifest_idx].data_file->split_offsets,
manifest_entry_count,
+ view_of_file_field, int64_t);
+ break;
+ case 14:
+
PARSE_INTEGER_VECTOR_FIELD(manifest_entries[manifest_idx].data_file->equality_ids,
+ manifest_entry_count, view_of_file_field,
int32_t);
break;
case 15:
- PARSE_PRIMITIVE_FIELD(first_row_id, int64_t);
+
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->sort_order_id,
+ view_of_file_field, int32_t);
+ break;
+ case 16:
+
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->first_row_id,
+ view_of_file_field, int64_t);
+ break;
+ case 17:
+
PARSE_STRING_FIELD(manifest_entries[row_idx].data_file->referenced_data_file,
+ view_of_file_field);
+ break;
+ case 18:
+
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->content_offset,
+ view_of_file_field, int64_t);
+ break;
+ case 19:
+
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->content_size_in_bytes,
+ view_of_file_field, int64_t);
break;
default:
- return InvalidManifestList("Unsupported type: {}", field_name);
+ return InvalidManifest("Unsupported field: {} in data file.",
field_name);
}
}
- return manifest_files;
+ return {};
}
-Result<std::vector<ManifestEntry>> ManifestReaderImpl::Entries() const {
return {}; }
+Result<std::vector<ManifestEntry>> ParseManifestEntry(ArrowSchema* schema,
+ ArrowArray* array_in,
+ const Schema&
iceberg_schema) {
+ if (schema->n_children != array_in->n_children) {
+ return InvalidManifest("Columns size not match between schema:{} and
array:{}",
+ schema->n_children, array_in->n_children);
+ }
+ if (iceberg_schema.fields().size() != array_in->n_children) {
+ return InvalidManifest("Columns size not match between schema:{} and
array:{}",
+ iceberg_schema.fields().size(),
array_in->n_children);
+ }
+
+ ArrowError error;
+ ArrowArrayView array_view;
+ auto status = ArrowArrayViewInitFromSchema(&array_view, schema, &error);
+ NANOARROW_RETURN_IF_NOT_OK(status, error);
+ internal::ArrowArrayViewGuard view_guard(&array_view);
+ status = ArrowArrayViewSetArray(&array_view, array_in, &error);
+ NANOARROW_RETURN_IF_NOT_OK(status, error);
+ status = ArrowArrayViewValidate(&array_view,
NANOARROW_VALIDATION_LEVEL_FULL, &error);
+ NANOARROW_RETURN_IF_NOT_OK(status, error);
+
+ std::vector<ManifestEntry> manifest_entries;
+ manifest_entries.resize(array_in->length);
+ for (size_t i = 0; i < array_in->length; i++) {
+ manifest_entries[i].data_file = std::make_shared<DataFile>();
+ }
+
+ for (int64_t idx = 0; idx < array_in->n_children; idx++) {
+ const auto& field = iceberg_schema.GetFieldByIndex(idx);
+ if (!field.has_value()) {
+ return InvalidManifest("Field not found in schema: {}", idx);
+ }
+ auto field_name = field.value().get().name();
+ bool required = !field.value().get().optional();
+ auto view_of_column = array_view.children[idx];
+
+ switch (idx) {
+ case 0:
+ PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].status, view_of_column,
+ ManifestStatus);
+ break;
+ case 1:
+ PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].snapshot_id,
view_of_column,
+ int64_t);
+ break;
+ case 2:
+ PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].sequence_number,
view_of_column,
+ int64_t);
+ break;
+ case 3:
+ PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].file_sequence_number,
+ view_of_column, int64_t);
+ break;
+ case 4: {
+ auto data_file_schema =
+ dynamic_pointer_cast<StructType>(field.value().get().type());
+ ICEBERG_RETURN_UNEXPECTED(
+ ParseDataFile(data_file_schema, view_of_column, manifest_entries));
+ break;
+ }
+ default:
+ return InvalidManifest("Unsupported field: {} in manifest entry.",
field_name);
+ }
+ }
+ return manifest_entries;
+}
+
+Result<std::vector<ManifestEntry>> ManifestReaderImpl::Entries() const {
+ std::vector<ManifestEntry> manifest_entries;
+ ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader_->Schema());
+ internal::ArrowSchemaGuard schema_guard(&arrow_schema);
+ while (true) {
+ ICEBERG_ASSIGN_OR_RAISE(auto result, reader_->Next());
+ if (result.has_value()) {
+ internal::ArrowArrayGuard array_guard(&result.value());
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto parse_result,
+ ParseManifestEntry(&arrow_schema, &result.value(), *schema_));
+ manifest_entries.insert(manifest_entries.end(),
+ std::make_move_iterator(parse_result.begin()),
+ std::make_move_iterator(parse_result.end()));
+ } else {
+ // eof
+ break;
+ }
+ }
+ return manifest_entries;
+}
Result<std::vector<ManifestFile>> ManifestListReaderImpl::Files() const {
std::vector<ManifestFile> manifest_files;
ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader_->Schema());
internal::ArrowSchemaGuard schema_guard(&arrow_schema);
while (true) {
- auto result = reader_->Next();
- if (!result.has_value()) {
- return InvalidManifestList("Failed to read manifest list entry:{}",
- result.error().message);
- }
- if (result.value().has_value()) {
- internal::ArrowArrayGuard array_guard(&result.value().value());
+ ICEBERG_ASSIGN_OR_RAISE(auto result, reader_->Next());
+ if (result.has_value()) {
Review Comment:
since it was defined in optional<xx> we should check it anyway. When reading
at the end of file, it will be nullotp.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]