wgtmac commented on code in PR #614:
URL: https://github.com/apache/iceberg-cpp/pull/614#discussion_r3363805243


##########
src/iceberg/catalog/rest/types.h:
##########
@@ -295,4 +299,73 @@ struct ICEBERG_REST_EXPORT OAuthTokenResponse {
   bool operator==(const OAuthTokenResponse&) const = default;
 };
 
+/// \brief Request to initiate a server-side scan planning operation.
+struct ICEBERG_REST_EXPORT PlanTableScanRequest {
+  std::optional<int64_t> snapshot_id;
+  std::vector<std::string> select;
+  std::shared_ptr<Expression> filter;
+  bool case_sensitive = true;
+  bool use_snapshot_schema = false;
+  std::optional<int64_t> start_snapshot_id;
+  std::optional<int64_t> end_snapshot_id;
+  std::vector<std::string> stats_fields;
+  std::optional<int64_t> min_rows_requested;
+
+  Status Validate() const;
+
+  bool operator==(const PlanTableScanRequest&) const;
+};
+
+/// \brief Base response containing scan tasks and delete files returned by 
scan plan
+/// endpoints.
+struct ICEBERG_REST_EXPORT BaseScanTaskResponse {
+  std::vector<std::string> plan_tasks;
+  std::vector<std::shared_ptr<FileScanTask>> file_scan_tasks;
+  std::vector<std::shared_ptr<DataFile>> delete_files;
+  // std::unordered_map<std::string, PartitionSpec> specsById;
+
+  Status Validate() const { return {}; };
+
+  bool operator==(const BaseScanTaskResponse&) const;
+};
+
+/// \brief Response from initiating a scan planning operation, including plan 
status and
+/// initial scan tasks.
+struct ICEBERG_REST_EXPORT PlanTableScanResponse : BaseScanTaskResponse {
+  PlanStatus plan_status = PlanStatus::kCompleted;
+  std::string plan_id;
+  // TODO(sandeepg): Add credentials.

Review Comment:
   `storage-credentials` is part of the completed scan planning response, and 
Java uses it to build the scan FileIO. Dropping it here means REST catalogs 
that return temporary storage credentials can produce scan tasks that the C++ 
client cannot read.



##########
src/iceberg/catalog/rest/types.h:
##########
@@ -295,4 +299,73 @@ struct ICEBERG_REST_EXPORT OAuthTokenResponse {
   bool operator==(const OAuthTokenResponse&) const = default;
 };
 
+/// \brief Request to initiate a server-side scan planning operation.
+struct ICEBERG_REST_EXPORT PlanTableScanRequest {
+  std::optional<int64_t> snapshot_id;
+  std::vector<std::string> select;
+  std::shared_ptr<Expression> filter;
+  bool case_sensitive = true;
+  bool use_snapshot_schema = false;
+  std::optional<int64_t> start_snapshot_id;
+  std::optional<int64_t> end_snapshot_id;
+  std::vector<std::string> stats_fields;
+  std::optional<int64_t> min_rows_requested;
+
+  Status Validate() const;
+
+  bool operator==(const PlanTableScanRequest&) const;
+};
+
+/// \brief Base response containing scan tasks and delete files returned by 
scan plan
+/// endpoints.
+struct ICEBERG_REST_EXPORT BaseScanTaskResponse {
+  std::vector<std::string> plan_tasks;
+  std::vector<std::shared_ptr<FileScanTask>> file_scan_tasks;
+  std::vector<std::shared_ptr<DataFile>> delete_files;
+  // std::unordered_map<std::string, PartitionSpec> specsById;
+
+  Status Validate() const { return {}; };
+
+  bool operator==(const BaseScanTaskResponse&) const;
+};
+
+/// \brief Response from initiating a scan planning operation, including plan 
status and
+/// initial scan tasks.
+struct ICEBERG_REST_EXPORT PlanTableScanResponse : BaseScanTaskResponse {
+  PlanStatus plan_status = PlanStatus::kCompleted;

Review Comment:
   Failed planning responses also carry the REST `error` payload. Please model 
and parse/serialize it for both `PlanTableScanResponse` and 
`FetchPlanningResultResponse`; otherwise callers lose the server error 
type/message/code.



##########
src/iceberg/catalog/rest/json_serde.cc:
##########
@@ -78,6 +83,406 @@ constexpr std::string_view kExpiresIn = "expires_in";
 constexpr std::string_view kIssuedTokenType = "issued_token_type";
 constexpr std::string_view kRefreshToken = "refresh_token";
 constexpr std::string_view kOAuthScope = "scope";
+constexpr std::string_view kPlanStatus = "status";
+constexpr std::string_view kPlanId = "plan-id";
+constexpr std::string_view kPlanTasks = "plan-tasks";
+constexpr std::string_view kFileScanTasks = "file-scan-tasks";
+constexpr std::string_view kDeleteFiles = "delete-files";
+constexpr std::string_view kSnapshotId = "snapshot-id";
+constexpr std::string_view kSelect = "select";
+constexpr std::string_view kFilter = "filter";
+constexpr std::string_view kCaseSensitive = "case-sensitive";
+constexpr std::string_view kUseSnapshotSchema = "use-snapshot-schema";
+constexpr std::string_view kStartSnapshotId = "start-snapshot-id";
+constexpr std::string_view kEndSnapshotId = "end-snapshot-id";
+constexpr std::string_view kStatsFields = "stats-fields";
+constexpr std::string_view kMinRowsRequested = "min-rows-requested";
+constexpr std::string_view kPlanTask = "plan-task";
+constexpr std::string_view kContent = "content";
+constexpr std::string_view kContentData = "data";
+constexpr std::string_view kContentPositionDeletes = "position-deletes";
+constexpr std::string_view kContentEqualityDeletes = "equality-deletes";
+constexpr std::string_view kFilePath = "file-path";
+constexpr std::string_view kFileFormat = "file-format";
+constexpr std::string_view kSpecId = "spec-id";
+constexpr std::string_view kPartition = "partition";
+constexpr std::string_view kRecordCount = "record-count";
+constexpr std::string_view kFileSizeInBytes = "file-size-in-bytes";
+constexpr std::string_view kColumnSizes = "column-sizes";
+constexpr std::string_view kValueCounts = "value-counts";
+constexpr std::string_view kNullValueCounts = "null-value-counts";
+constexpr std::string_view kNanValueCounts = "nan-value-counts";
+constexpr std::string_view kLowerBounds = "lower-bounds";
+constexpr std::string_view kUpperBounds = "upper-bounds";
+constexpr std::string_view kKeyMetadata = "key-metadata";
+constexpr std::string_view kSplitOffsets = "split-offsets";
+constexpr std::string_view kEqualityIds = "equality-ids";
+constexpr std::string_view kSortOrderId = "sort-order-id";
+constexpr std::string_view kFirstRowId = "first-row-id";
+constexpr std::string_view kReferencedDataFile = "referenced-data-file";
+constexpr std::string_view kContentOffset = "content-offset";
+constexpr std::string_view kContentSizeInBytes = "content-size-in-bytes";
+constexpr std::string_view kDataFile = "data-file";
+constexpr std::string_view kDeleteFileReferences = "delete-file-references";
+constexpr std::string_view kResidualFilter = "residual-filter";
+
+}  // namespace
+
+Result<DataFile> DataFileFromJson(
+    const nlohmann::json& json,
+    const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>&
+        partition_spec_by_id,
+    const Schema& schema) {
+  if (!json.is_object()) {
+    return JsonParseError("DataFile must be a JSON object: {}", 
SafeDumpJson(json));
+  }
+  DataFile df;
+
+  ICEBERG_ASSIGN_OR_RAISE(auto content_str, GetJsonValue<std::string>(json, 
kContent));
+  if (content_str == kContentData) {
+    df.content = DataFile::Content::kData;
+  } else if (content_str == kContentPositionDeletes) {
+    df.content = DataFile::Content::kPositionDeletes;
+  } else if (content_str == kContentEqualityDeletes) {
+    df.content = DataFile::Content::kEqualityDeletes;
+  } else {
+    return JsonParseError("Unknown data file content: {}", content_str);
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(df.file_path, GetJsonValue<std::string>(json, 
kFilePath));
+  ICEBERG_ASSIGN_OR_RAISE(auto format_str, GetJsonValue<std::string>(json, 
kFileFormat));
+  ICEBERG_ASSIGN_OR_RAISE(df.file_format, 
FileFormatTypeFromString(format_str));
+
+  if (json.contains(kSpecId) && !json.at(kSpecId).is_null()) {
+    ICEBERG_ASSIGN_OR_RAISE(auto spec_id, GetJsonValue<int32_t>(json, 
kSpecId));
+    df.partition_spec_id = spec_id;
+  }
+
+  if (json.contains(kPartition)) {
+    ICEBERG_ASSIGN_OR_RAISE(auto partition_vals,
+                            GetJsonValue<nlohmann::json>(json, kPartition));
+    if (!partition_vals.is_array()) {
+      return JsonParseError("PartitionValues must be a JSON array: {}",
+                            SafeDumpJson(partition_vals));
+    }
+    std::vector<Literal> literals;
+    auto it = partition_spec_by_id.find(df.partition_spec_id.value_or(-1));
+    if (it == partition_spec_by_id.end()) {
+      return JsonParseError("Invalid partition spec id: {}",
+                            df.partition_spec_id.value_or(-1));
+    }
+    ICEBERG_ASSIGN_OR_RAISE(auto struct_type, 
it->second->PartitionType(schema));
+    auto fields = struct_type->fields();
+    if (partition_vals.size() != fields.size()) {
+      return JsonParseError("Invalid partition data size: expected = {}, 
actual = {}",
+                            fields.size(), partition_vals.size());
+    }
+    for (size_t pos = 0; pos < fields.size(); ++pos) {
+      ICEBERG_ASSIGN_OR_RAISE(
+          auto literal, LiteralFromJson(partition_vals[pos], 
fields[pos].type().get()));
+      literals.push_back(std::move(literal));
+    }
+    df.partition = PartitionValues(std::move(literals));
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(df.record_count, GetJsonValue<int64_t>(json, 
kRecordCount));
+  ICEBERG_ASSIGN_OR_RAISE(df.file_size_in_bytes,
+                          GetJsonValue<int64_t>(json, kFileSizeInBytes));
+
+  auto parse_int_map = [&](std::string_view key,
+                           std::map<int32_t, int64_t>& target) -> Status {
+    if (!json.contains(key) || json.at(key).is_null()) {
+      return {};
+    }
+    ICEBERG_ASSIGN_OR_RAISE(auto map_json, GetJsonValue<nlohmann::json>(json, 
key));
+    ICEBERG_ASSIGN_OR_RAISE(auto keys,
+                            
GetTypedJsonValue<std::vector<int32_t>>(map_json.at("keys")));
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto values, 
GetTypedJsonValue<std::vector<int64_t>>(map_json.at("values")));
+    if (keys.size() != values.size()) {
+      return JsonParseError("'{}' map keys and values have different lengths", 
key);
+    }
+    for (size_t i = 0; i < keys.size(); ++i) {
+      target[keys[i]] = values[i];
+    }
+    return {};
+  };
+
+  ICEBERG_RETURN_UNEXPECTED(parse_int_map(kColumnSizes, df.column_sizes));
+  ICEBERG_RETURN_UNEXPECTED(parse_int_map(kValueCounts, df.value_counts));
+  ICEBERG_RETURN_UNEXPECTED(parse_int_map(kNullValueCounts, 
df.null_value_counts));
+  ICEBERG_RETURN_UNEXPECTED(parse_int_map(kNanValueCounts, 
df.nan_value_counts));
+
+  auto parse_binary_map = [&](std::string_view key,
+                              std::map<int32_t, std::vector<uint8_t>>& target) 
-> Status {
+    if (!json.contains(key) || json.at(key).is_null()) {
+      return {};
+    }
+    ICEBERG_ASSIGN_OR_RAISE(auto map_json, GetJsonValue<nlohmann::json>(json, 
key));
+    ICEBERG_ASSIGN_OR_RAISE(auto keys,
+                            GetJsonValue<std::vector<int32_t>>(map_json, 
"keys"));
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto values, GetJsonValue<std::vector<std::vector<uint8_t>>>(map_json, 
"values"));
+    if (keys.size() != values.size()) {
+      return JsonParseError("'{}' binary map keys and values have different 
lengths",
+                            key);
+    }
+    for (size_t i = 0; i < keys.size(); ++i) {
+      target[keys[i]] = values[i];
+    }
+    return {};
+  };
+
+  ICEBERG_RETURN_UNEXPECTED(parse_binary_map(kLowerBounds, df.lower_bounds));
+  ICEBERG_RETURN_UNEXPECTED(parse_binary_map(kUpperBounds, df.upper_bounds));
+
+  if (json.contains(kKeyMetadata) && !json.at(kKeyMetadata).is_null()) {
+    ICEBERG_ASSIGN_OR_RAISE(df.key_metadata,
+                            GetJsonValue<std::vector<uint8_t>>(json, 
kKeyMetadata));
+  }
+  if (json.contains(kSplitOffsets) && !json.at(kSplitOffsets).is_null()) {
+    ICEBERG_ASSIGN_OR_RAISE(df.split_offsets,
+                            GetJsonValue<std::vector<int64_t>>(json, 
kSplitOffsets));
+  }
+  if (json.contains(kEqualityIds) && !json.at(kEqualityIds).is_null()) {
+    ICEBERG_ASSIGN_OR_RAISE(df.equality_ids,
+                            GetJsonValue<std::vector<int32_t>>(json, 
kEqualityIds));
+  }
+  if (json.contains(kSortOrderId) && !json.at(kSortOrderId).is_null()) {
+    ICEBERG_ASSIGN_OR_RAISE(df.sort_order_id, GetJsonValue<int32_t>(json, 
kSortOrderId));
+  }
+  if (json.contains(kFirstRowId) && !json.at(kFirstRowId).is_null()) {
+    ICEBERG_ASSIGN_OR_RAISE(df.first_row_id, GetJsonValue<int64_t>(json, 
kFirstRowId));
+  }
+  if (json.contains(kReferencedDataFile) && 
!json.at(kReferencedDataFile).is_null()) {
+    ICEBERG_ASSIGN_OR_RAISE(df.referenced_data_file,
+                            GetJsonValue<std::string>(json, 
kReferencedDataFile));
+  }
+  if (json.contains(kContentOffset) && !json.at(kContentOffset).is_null()) {
+    ICEBERG_ASSIGN_OR_RAISE(df.content_offset,
+                            GetJsonValue<int64_t>(json, kContentOffset));
+  }
+  if (json.contains(kContentSizeInBytes) && 
!json.at(kContentSizeInBytes).is_null()) {
+    ICEBERG_ASSIGN_OR_RAISE(df.content_size_in_bytes,
+                            GetJsonValue<int64_t>(json, kContentSizeInBytes));
+  }
+
+  return df;
+}
+
+Result<std::vector<std::shared_ptr<FileScanTask>>> FileScanTasksFromJson(
+    const nlohmann::json& json,
+    const std::vector<std::shared_ptr<DataFile>>& delete_files,
+    const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>&
+        partition_spec_by_id,
+    const Schema& schema) {
+  if (!json.is_array()) {
+    return JsonParseError("Cannot parse file scan tasks from non-array: {}",
+                          SafeDumpJson(json));
+  }
+  std::vector<std::shared_ptr<FileScanTask>> file_scan_tasks;
+  for (const auto& task_json : json) {
+    if (!task_json.is_object()) {
+      return JsonParseError("Cannot parse file scan task from a non-object: 
{}",
+                            SafeDumpJson(task_json));
+    }
+
+    ICEBERG_ASSIGN_OR_RAISE(auto data_file_json,
+                            GetJsonValue<nlohmann::json>(task_json, 
kDataFile));
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto data_file, DataFileFromJson(data_file_json, partition_spec_by_id, 
schema));
+
+    std::vector<std::shared_ptr<DataFile>> task_delete_files;
+    if (task_json.contains(kDeleteFileReferences) &&
+        !task_json.at(kDeleteFileReferences).is_null()) {
+      ICEBERG_ASSIGN_OR_RAISE(auto refs, GetJsonValue<std::vector<int32_t>>(
+                                             task_json, 
kDeleteFileReferences));
+      for (int32_t ref : refs) {
+        if (ref < 0 || static_cast<size_t>(ref) >= delete_files.size()) {
+          return JsonParseError(
+              "delete-file-references index {} is out of range (delete_files 
size: {})",
+              ref, delete_files.size());
+        }
+        task_delete_files.push_back(delete_files[ref]);
+      }
+    }
+
+    std::shared_ptr<Expression> residual_filter;
+    if (task_json.contains(kResidualFilter) && 
!task_json.at(kResidualFilter).is_null()) {
+      ICEBERG_ASSIGN_OR_RAISE(auto filter_json,
+                              GetJsonValue<nlohmann::json>(task_json, 
kResidualFilter));
+      ICEBERG_ASSIGN_OR_RAISE(residual_filter, 
ExpressionFromJson(filter_json));
+    }
+
+    file_scan_tasks.push_back(std::make_shared<FileScanTask>(
+        std::make_shared<DataFile>(std::move(data_file)), 
std::move(task_delete_files),
+        std::move(residual_filter)));
+  }
+  return file_scan_tasks;
+}
+
+nlohmann::json ToJson(const DataFile& df) {

Review Comment:
   `partition` is a required ContentFile field in the REST schema, and Java's 
`ContentFileParser` writes it for scan tasks. This serializer never emits it, 
so partitioned scan tasks are not spec-compatible and lose partition data on 
round-trip.



##########
src/iceberg/catalog/rest/json_serde.cc:
##########
@@ -78,6 +83,406 @@ constexpr std::string_view kExpiresIn = "expires_in";
 constexpr std::string_view kIssuedTokenType = "issued_token_type";
 constexpr std::string_view kRefreshToken = "refresh_token";
 constexpr std::string_view kOAuthScope = "scope";
+constexpr std::string_view kPlanStatus = "status";
+constexpr std::string_view kPlanId = "plan-id";
+constexpr std::string_view kPlanTasks = "plan-tasks";
+constexpr std::string_view kFileScanTasks = "file-scan-tasks";
+constexpr std::string_view kDeleteFiles = "delete-files";
+constexpr std::string_view kSnapshotId = "snapshot-id";
+constexpr std::string_view kSelect = "select";
+constexpr std::string_view kFilter = "filter";
+constexpr std::string_view kCaseSensitive = "case-sensitive";
+constexpr std::string_view kUseSnapshotSchema = "use-snapshot-schema";
+constexpr std::string_view kStartSnapshotId = "start-snapshot-id";
+constexpr std::string_view kEndSnapshotId = "end-snapshot-id";
+constexpr std::string_view kStatsFields = "stats-fields";
+constexpr std::string_view kMinRowsRequested = "min-rows-requested";
+constexpr std::string_view kPlanTask = "plan-task";
+constexpr std::string_view kContent = "content";
+constexpr std::string_view kContentData = "data";
+constexpr std::string_view kContentPositionDeletes = "position-deletes";
+constexpr std::string_view kContentEqualityDeletes = "equality-deletes";
+constexpr std::string_view kFilePath = "file-path";
+constexpr std::string_view kFileFormat = "file-format";
+constexpr std::string_view kSpecId = "spec-id";
+constexpr std::string_view kPartition = "partition";
+constexpr std::string_view kRecordCount = "record-count";
+constexpr std::string_view kFileSizeInBytes = "file-size-in-bytes";
+constexpr std::string_view kColumnSizes = "column-sizes";
+constexpr std::string_view kValueCounts = "value-counts";
+constexpr std::string_view kNullValueCounts = "null-value-counts";
+constexpr std::string_view kNanValueCounts = "nan-value-counts";
+constexpr std::string_view kLowerBounds = "lower-bounds";
+constexpr std::string_view kUpperBounds = "upper-bounds";
+constexpr std::string_view kKeyMetadata = "key-metadata";
+constexpr std::string_view kSplitOffsets = "split-offsets";
+constexpr std::string_view kEqualityIds = "equality-ids";
+constexpr std::string_view kSortOrderId = "sort-order-id";
+constexpr std::string_view kFirstRowId = "first-row-id";
+constexpr std::string_view kReferencedDataFile = "referenced-data-file";
+constexpr std::string_view kContentOffset = "content-offset";
+constexpr std::string_view kContentSizeInBytes = "content-size-in-bytes";
+constexpr std::string_view kDataFile = "data-file";
+constexpr std::string_view kDeleteFileReferences = "delete-file-references";
+constexpr std::string_view kResidualFilter = "residual-filter";
+
+}  // namespace
+
+Result<DataFile> DataFileFromJson(
+    const nlohmann::json& json,
+    const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>&
+        partition_spec_by_id,
+    const Schema& schema) {
+  if (!json.is_object()) {
+    return JsonParseError("DataFile must be a JSON object: {}", 
SafeDumpJson(json));
+  }
+  DataFile df;
+
+  ICEBERG_ASSIGN_OR_RAISE(auto content_str, GetJsonValue<std::string>(json, 
kContent));
+  if (content_str == kContentData) {
+    df.content = DataFile::Content::kData;
+  } else if (content_str == kContentPositionDeletes) {
+    df.content = DataFile::Content::kPositionDeletes;
+  } else if (content_str == kContentEqualityDeletes) {
+    df.content = DataFile::Content::kEqualityDeletes;
+  } else {
+    return JsonParseError("Unknown data file content: {}", content_str);
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(df.file_path, GetJsonValue<std::string>(json, 
kFilePath));
+  ICEBERG_ASSIGN_OR_RAISE(auto format_str, GetJsonValue<std::string>(json, 
kFileFormat));
+  ICEBERG_ASSIGN_OR_RAISE(df.file_format, 
FileFormatTypeFromString(format_str));
+
+  if (json.contains(kSpecId) && !json.at(kSpecId).is_null()) {
+    ICEBERG_ASSIGN_OR_RAISE(auto spec_id, GetJsonValue<int32_t>(json, 
kSpecId));
+    df.partition_spec_id = spec_id;
+  }
+
+  if (json.contains(kPartition)) {
+    ICEBERG_ASSIGN_OR_RAISE(auto partition_vals,
+                            GetJsonValue<nlohmann::json>(json, kPartition));
+    if (!partition_vals.is_array()) {
+      return JsonParseError("PartitionValues must be a JSON array: {}",
+                            SafeDumpJson(partition_vals));
+    }
+    std::vector<Literal> literals;
+    auto it = partition_spec_by_id.find(df.partition_spec_id.value_or(-1));
+    if (it == partition_spec_by_id.end()) {
+      return JsonParseError("Invalid partition spec id: {}",
+                            df.partition_spec_id.value_or(-1));
+    }
+    ICEBERG_ASSIGN_OR_RAISE(auto struct_type, 
it->second->PartitionType(schema));
+    auto fields = struct_type->fields();
+    if (partition_vals.size() != fields.size()) {
+      return JsonParseError("Invalid partition data size: expected = {}, 
actual = {}",
+                            fields.size(), partition_vals.size());
+    }
+    for (size_t pos = 0; pos < fields.size(); ++pos) {
+      ICEBERG_ASSIGN_OR_RAISE(
+          auto literal, LiteralFromJson(partition_vals[pos], 
fields[pos].type().get()));
+      literals.push_back(std::move(literal));
+    }
+    df.partition = PartitionValues(std::move(literals));
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(df.record_count, GetJsonValue<int64_t>(json, 
kRecordCount));
+  ICEBERG_ASSIGN_OR_RAISE(df.file_size_in_bytes,
+                          GetJsonValue<int64_t>(json, kFileSizeInBytes));
+
+  auto parse_int_map = [&](std::string_view key,
+                           std::map<int32_t, int64_t>& target) -> Status {
+    if (!json.contains(key) || json.at(key).is_null()) {
+      return {};
+    }
+    ICEBERG_ASSIGN_OR_RAISE(auto map_json, GetJsonValue<nlohmann::json>(json, 
key));
+    ICEBERG_ASSIGN_OR_RAISE(auto keys,
+                            
GetTypedJsonValue<std::vector<int32_t>>(map_json.at("keys")));
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto values, 
GetTypedJsonValue<std::vector<int64_t>>(map_json.at("values")));
+    if (keys.size() != values.size()) {
+      return JsonParseError("'{}' map keys and values have different lengths", 
key);
+    }
+    for (size_t i = 0; i < keys.size(); ++i) {
+      target[keys[i]] = values[i];
+    }
+    return {};
+  };
+
+  ICEBERG_RETURN_UNEXPECTED(parse_int_map(kColumnSizes, df.column_sizes));
+  ICEBERG_RETURN_UNEXPECTED(parse_int_map(kValueCounts, df.value_counts));
+  ICEBERG_RETURN_UNEXPECTED(parse_int_map(kNullValueCounts, 
df.null_value_counts));
+  ICEBERG_RETURN_UNEXPECTED(parse_int_map(kNanValueCounts, 
df.nan_value_counts));
+
+  auto parse_binary_map = [&](std::string_view key,
+                              std::map<int32_t, std::vector<uint8_t>>& target) 
-> Status {
+    if (!json.contains(key) || json.at(key).is_null()) {
+      return {};
+    }
+    ICEBERG_ASSIGN_OR_RAISE(auto map_json, GetJsonValue<nlohmann::json>(json, 
key));
+    ICEBERG_ASSIGN_OR_RAISE(auto keys,
+                            GetJsonValue<std::vector<int32_t>>(map_json, 
"keys"));
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto values, GetJsonValue<std::vector<std::vector<uint8_t>>>(map_json, 
"values"));
+    if (keys.size() != values.size()) {
+      return JsonParseError("'{}' binary map keys and values have different 
lengths",
+                            key);
+    }
+    for (size_t i = 0; i < keys.size(); ++i) {
+      target[keys[i]] = values[i];
+    }
+    return {};
+  };
+
+  ICEBERG_RETURN_UNEXPECTED(parse_binary_map(kLowerBounds, df.lower_bounds));
+  ICEBERG_RETURN_UNEXPECTED(parse_binary_map(kUpperBounds, df.upper_bounds));
+
+  if (json.contains(kKeyMetadata) && !json.at(kKeyMetadata).is_null()) {
+    ICEBERG_ASSIGN_OR_RAISE(df.key_metadata,
+                            GetJsonValue<std::vector<uint8_t>>(json, 
kKeyMetadata));
+  }
+  if (json.contains(kSplitOffsets) && !json.at(kSplitOffsets).is_null()) {
+    ICEBERG_ASSIGN_OR_RAISE(df.split_offsets,
+                            GetJsonValue<std::vector<int64_t>>(json, 
kSplitOffsets));
+  }
+  if (json.contains(kEqualityIds) && !json.at(kEqualityIds).is_null()) {
+    ICEBERG_ASSIGN_OR_RAISE(df.equality_ids,
+                            GetJsonValue<std::vector<int32_t>>(json, 
kEqualityIds));
+  }
+  if (json.contains(kSortOrderId) && !json.at(kSortOrderId).is_null()) {
+    ICEBERG_ASSIGN_OR_RAISE(df.sort_order_id, GetJsonValue<int32_t>(json, 
kSortOrderId));
+  }
+  if (json.contains(kFirstRowId) && !json.at(kFirstRowId).is_null()) {
+    ICEBERG_ASSIGN_OR_RAISE(df.first_row_id, GetJsonValue<int64_t>(json, 
kFirstRowId));
+  }
+  if (json.contains(kReferencedDataFile) && 
!json.at(kReferencedDataFile).is_null()) {
+    ICEBERG_ASSIGN_OR_RAISE(df.referenced_data_file,
+                            GetJsonValue<std::string>(json, 
kReferencedDataFile));
+  }
+  if (json.contains(kContentOffset) && !json.at(kContentOffset).is_null()) {
+    ICEBERG_ASSIGN_OR_RAISE(df.content_offset,
+                            GetJsonValue<int64_t>(json, kContentOffset));
+  }
+  if (json.contains(kContentSizeInBytes) && 
!json.at(kContentSizeInBytes).is_null()) {
+    ICEBERG_ASSIGN_OR_RAISE(df.content_size_in_bytes,
+                            GetJsonValue<int64_t>(json, kContentSizeInBytes));
+  }
+
+  return df;
+}
+
+Result<std::vector<std::shared_ptr<FileScanTask>>> FileScanTasksFromJson(
+    const nlohmann::json& json,
+    const std::vector<std::shared_ptr<DataFile>>& delete_files,
+    const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>&
+        partition_spec_by_id,
+    const Schema& schema) {
+  if (!json.is_array()) {
+    return JsonParseError("Cannot parse file scan tasks from non-array: {}",
+                          SafeDumpJson(json));
+  }
+  std::vector<std::shared_ptr<FileScanTask>> file_scan_tasks;
+  for (const auto& task_json : json) {
+    if (!task_json.is_object()) {
+      return JsonParseError("Cannot parse file scan task from a non-object: 
{}",
+                            SafeDumpJson(task_json));
+    }
+
+    ICEBERG_ASSIGN_OR_RAISE(auto data_file_json,
+                            GetJsonValue<nlohmann::json>(task_json, 
kDataFile));
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto data_file, DataFileFromJson(data_file_json, partition_spec_by_id, 
schema));
+
+    std::vector<std::shared_ptr<DataFile>> task_delete_files;
+    if (task_json.contains(kDeleteFileReferences) &&
+        !task_json.at(kDeleteFileReferences).is_null()) {
+      ICEBERG_ASSIGN_OR_RAISE(auto refs, GetJsonValue<std::vector<int32_t>>(
+                                             task_json, 
kDeleteFileReferences));
+      for (int32_t ref : refs) {
+        if (ref < 0 || static_cast<size_t>(ref) >= delete_files.size()) {
+          return JsonParseError(
+              "delete-file-references index {} is out of range (delete_files 
size: {})",
+              ref, delete_files.size());
+        }
+        task_delete_files.push_back(delete_files[ref]);
+      }
+    }
+
+    std::shared_ptr<Expression> residual_filter;
+    if (task_json.contains(kResidualFilter) && 
!task_json.at(kResidualFilter).is_null()) {
+      ICEBERG_ASSIGN_OR_RAISE(auto filter_json,
+                              GetJsonValue<nlohmann::json>(task_json, 
kResidualFilter));
+      ICEBERG_ASSIGN_OR_RAISE(residual_filter, 
ExpressionFromJson(filter_json));
+    }
+
+    file_scan_tasks.push_back(std::make_shared<FileScanTask>(
+        std::make_shared<DataFile>(std::move(data_file)), 
std::move(task_delete_files),
+        std::move(residual_filter)));
+  }
+  return file_scan_tasks;
+}
+
+nlohmann::json ToJson(const DataFile& df) {
+  nlohmann::json json;
+  switch (df.content) {
+    case DataFile::Content::kData:
+      json[kContent] = kContentData;
+      break;
+    case DataFile::Content::kPositionDeletes:
+      json[kContent] = kContentPositionDeletes;
+      break;
+    case DataFile::Content::kEqualityDeletes:
+      json[kContent] = kContentEqualityDeletes;
+      break;
+  }
+  json[kFilePath] = df.file_path;
+  json[kFileFormat] = ToString(df.file_format);
+
+  if (df.partition_spec_id.has_value()) {
+    json[kSpecId] = df.partition_spec_id.value();
+  }
+
+  json[kRecordCount] = df.record_count;
+  json[kFileSizeInBytes] = df.file_size_in_bytes;
+
+  auto write_int_map = [&](std::string_view key, const std::map<int32_t, 
int64_t>& m) {
+    if (!m.empty()) {
+      std::vector<int32_t> keys;
+      std::vector<int64_t> values;
+      for (const auto& [k, v] : m) {
+        keys.push_back(k);
+        values.push_back(v);
+      }
+      json[key] = {{"keys", std::move(keys)}, {"values", std::move(values)}};
+    }
+  };
+
+  write_int_map(kColumnSizes, df.column_sizes);
+  write_int_map(kValueCounts, df.value_counts);
+  write_int_map(kNullValueCounts, df.null_value_counts);
+  write_int_map(kNanValueCounts, df.nan_value_counts);
+
+  auto write_binary_map = [&](std::string_view key,
+                              const std::map<int32_t, std::vector<uint8_t>>& 
m) {
+    if (!m.empty()) {
+      std::vector<int32_t> keys;
+      std::vector<std::vector<uint8_t>> values;
+      for (const auto& [k, v] : m) {
+        keys.push_back(k);
+        values.push_back(v);
+      }
+      json[key] = {{"keys", std::move(keys)}, {"values", std::move(values)}};
+    }
+  };
+
+  write_binary_map(kLowerBounds, df.lower_bounds);
+  write_binary_map(kUpperBounds, df.upper_bounds);
+
+  if (!df.key_metadata.empty()) {
+    json[kKeyMetadata] = df.key_metadata;
+  }
+  if (!df.split_offsets.empty()) {
+    json[kSplitOffsets] = df.split_offsets;
+  }
+  if (!df.equality_ids.empty()) {
+    json[kEqualityIds] = df.equality_ids;
+  }
+  if (df.sort_order_id.has_value()) {
+    json[kSortOrderId] = df.sort_order_id.value();
+  }
+  if (df.first_row_id.has_value()) {
+    json[kFirstRowId] = df.first_row_id.value();
+  }
+  if (df.referenced_data_file.has_value()) {
+    json[kReferencedDataFile] = df.referenced_data_file.value();
+  }
+  if (df.content_offset.has_value()) {
+    json[kContentOffset] = df.content_offset.value();
+  }
+  if (df.content_size_in_bytes.has_value()) {
+    json[kContentSizeInBytes] = df.content_size_in_bytes.value();
+  }
+
+  return json;
+}
+
+namespace {
+
+nlohmann::json BaseScanTaskResponseToJson(const BaseScanTaskResponse& 
response) {
+  nlohmann::json json;
+
+  SetContainerField(json, kPlanTasks, response.plan_tasks);
+
+  // Build delete_files array and a pointer-to-index map for reference lookup.
+  std::unordered_map<const DataFile*, int32_t> delete_file_index;
+  nlohmann::json delete_files_json = nlohmann::json::array();
+  for (size_t i = 0; i < response.delete_files.size(); ++i) {
+    if (response.delete_files[i]) {
+      delete_files_json.push_back(ToJson(*response.delete_files[i]));
+      delete_file_index[response.delete_files[i].get()] = 
static_cast<int32_t>(i);
+    }
+  }
+  if (!delete_files_json.empty()) {
+    json[kDeleteFiles] = std::move(delete_files_json);
+  }
+
+  nlohmann::json tasks_json = nlohmann::json::array();
+  for (const auto& task : response.file_scan_tasks) {
+    if (!task) continue;
+    nlohmann::json task_json;
+    if (task->data_file()) {
+      task_json[kDataFile] = ToJson(*task->data_file());
+    }
+    if (!task->delete_files().empty()) {
+      std::vector<int32_t> refs;
+      for (const auto& df : task->delete_files()) {
+        auto it = delete_file_index.find(df.get());

Review Comment:
   Using pointer identity here can silently drop delete references when 
`response.delete_files` and `task->delete_files()` hold equivalent `DataFile`s 
through different `shared_ptr`s. Please key by stable file identity, as Java 
does with delete file location, or derive the response delete list from task 
deletes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to