wgtmac commented on code in PR #687:
URL: https://github.com/apache/iceberg-cpp/pull/687#discussion_r3322704869


##########
src/iceberg/util/executor.h:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <functional>
+#include <optional>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/result.h"
+#include "iceberg/util/functional.h"
+
+namespace iceberg {
+
+using ExecutorTask = FnOnce<void()>;
+
+class ICEBERG_EXPORT Executor {
+ public:
+  virtual ~Executor() = default;
+
+  /// \brief Schedule a task for execution.
+  virtual Status Submit(ExecutorTask task) = 0;

Review Comment:
   This is a fire-and-forget `execute`-style primitive (closer to the abandoned 
P0443 `executor.execute` than to P2300's scheduler/sender). Completion is 
tracked outside, via the `std::promise`/`future` plumbing in 
`RunTasksParallel`. Fine for a blocking parallel-for, but it doesn't lay 
groundwork for coroutines or `std::execution`: those need the executor to hand 
back something awaitable/composable, and the planning APIs (`PlanFiles() -> 
Result<...>`) are synchronous anyway. Going async later would be a separate 
interface, not an extension of this one. Worth stating in the header that 
`Executor` is a parallel-dispatch primitive, not an async scheduler.
   
   Separately: `ExecutorTask` being move-only is the right call and matches 
Arrow, but pools whose submit takes a copyable `std::function` will need 
`std::move_only_function` or a small shim to adapt.



##########
src/iceberg/util/task_group.cc:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/util/task_group.h"
+
+#include <format>
+#include <future>
+#include <string>
+#include <utility>
+
+#include "iceberg/util/macros.h"
+
+namespace iceberg::internal {
+
+namespace {
+
+Status AggregateTaskErrors(std::vector<Error> errors) {
+  if (errors.empty()) {
+    return {};
+  }
+  if (errors.size() == 1) {
+    return std::unexpected(std::move(errors.front()));
+  }
+
+  ErrorKind kind = errors.front().kind;
+  std::string message = std::format("Task group failed with {} errors:", 
errors.size());
+  for (const auto& error : errors) {
+    message += std::format("\n  - {}", error.message);
+  }
+  return std::unexpected(Error{.kind = kind, .message = std::move(message)});
+}
+
+Result<std::future<Status>> SubmitTask(Executor& executor, FnOnce<Status()> 
task) {
+  std::promise<Status> promise;
+  auto future = promise.get_future();
+
+  ExecutorTask executor_task(
+      [promise = std::move(promise), task = std::move(task)]() mutable {
+        promise.set_value(std::move(task)());
+      });
+
+  ICEBERG_RETURN_UNEXPECTED(executor.Submit(std::move(executor_task)));
+
+  return future;
+}
+
+}  // namespace
+
+Status RunTasksSingleThreaded(std::vector<FnOnce<Status()>> tasks) {
+  std::vector<Error> errors;
+  for (auto& task : tasks) {
+    auto status = std::move(task)();
+    if (!status.has_value()) {
+      errors.push_back(std::move(status.error()));
+    }
+  }
+  return AggregateTaskErrors(std::move(errors));
+}
+
+Status RunTasksParallel(Executor& executor, std::vector<FnOnce<Status()>> 
tasks) {
+  std::vector<std::future<Status>> futures;
+  futures.reserve(tasks.size());
+
+  std::vector<Error> errors;
+  for (auto& task : tasks) {

Review Comment:
   All tasks are submitted up front, each with its own promise/future. For a 
handful of manifests that's fine, but as a general primitive there's no 
concurrency bound and no fail-fast: N tasks always queue N and allocate N 
futures, and if one fails early the rest still run to completion. Probably out 
of scope for this PR, but worth a note if engines may push large fan-outs 
through here.



##########
src/iceberg/update/snapshot_update.h:
##########
@@ -77,6 +79,16 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate {
     return self;
   }
 
+  /// \brief Configure an executor for manifest planning work.
+  ///
+  /// The executor is borrowed and must outlive this update. Planning 
callbacks may be
+  /// called concurrently; callers must synchronize shared mutable state 
captured by
+  /// those callbacks.
+  auto& ScanManifestsWith(this auto& self, Executor& executor) {

Review Comment:
   Naming: every other entry point uses `PlanWith` (TableScanBuilder, 
ExpireSnapshots, ManifestGroup, ManifestFilterManager, ManifestMergeManager), 
but this one is `ScanManifestsWith`. Worth settling on one name for a 
consistent public surface.



##########
src/iceberg/util/executor.h:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <functional>
+#include <optional>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/result.h"
+#include "iceberg/util/functional.h"
+
+namespace iceberg {
+
+using ExecutorTask = FnOnce<void()>;
+
+class ICEBERG_EXPORT Executor {
+ public:
+  virtual ~Executor() = default;
+
+  /// \brief Schedule a task for execution.
+  virtual Status Submit(ExecutorTask task) = 0;
+};
+
+using OptionalExecutor = std::optional<std::reference_wrapper<Executor>>;

Review Comment:
   `std::optional<std::reference_wrapper<Executor>>` is a little awkward to use 
(`executor_->get()` at the call sites). A plain `Executor*` expresses 
"nullable, non-owning borrow" just as well and reads cleaner everywhere it's 
passed. Minor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to