pingtimeout commented on code in PR #2180:
URL: https://github.com/apache/polaris/pull/2180#discussion_r2620062925


##########
tasks/README.md:
##########
@@ -0,0 +1,155 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+ 
+   http://www.apache.org/licenses/LICENSE-2.0
+ 
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# Apache Polaris Asynchronous & Reliable Tasks
+
+Cluster wide task execution plus delayed and recurring scheduling, including 
restarts of "lost" and retries of failed
+tasks.
+
+The "caller facing" API is literally just a single `submit()` function that is 
called with the _kind_ of operation and
+the parameters. The returned value provides a `CompletionStage`, which can be 
used - it is not required to subscribe to
+it(fire and forget).
+
+## Design
+
+Task operations are called _task behaviors_.
+_Task behaviors_ define the kind of operation that happens, the uniqueness of 
actual task executions,
+the request and response parameter type and how retries are handled.
+
+For example, "purging a table" is a _task behavior_, but also "cleanup the 
backend database" can be a _task behavior_.
+The most important distinction between the two former examples is that there 
can be multiple "purge table" operations
+(against different tables) but only one "cleanup the backend database" task 
instance.
+
+A task is submitted via the `Tasks` interface, giving the `submit()` function 
the _task behavior_ and _task
+parameters_. The returned object contains the _task ID_ and a 
`CompletionStage` to which call sites can subscribe to.
+_Task functions_ can return a ("non-void") result, which can be further 
processed via the `CompletionState`.
+
+Tasks are a global resource, not scoped to particular realms.
+
+Task behaviors define failure/retry handling.
+Failure handling happens when the JVM that started running a task did not 
write the execution result.
+Such failures can happen when the executing node died or crashed.
+Retry handling happens when a task execution failed, the task behavior defines 
the exact behavior of such retries.
+Task behaviors may also define that a successful task is rescheduled 
(repeating task).
+
+Despite that even successful tasks can be rescheduled, it is **not** a goal of 
this framework to have a fully-fledged
+scheduling implementation with various repetition patterns (think: `cron`).
+
+All tasks have a globally unique ID, which can be deterministic (i.e., 
computed from task parameters) for tasks that
+depend on other values, or non-deterministic for one-off tasks. For example, a 
task that performs a maintenance
+operation against a single global resource must have a deterministic, unique 
ID to ensure that no two instances of such
+a task run concurrently. How the ID is generated is defined by the _task 
behavior_.
+
+The task framework ensures that no task is executed on more than one Polaris 
node at any time.
+Shared persisted state object is used to coordinate operations against a task.
+The former mentioned guarantee can only hold true if the wall clock is 
synchronized across all Polaris instances and
+as long as there is no "split brain" situation in the backend database.
+
+The task state is a composite of:
+
+* the task ID (it's unique ID)
+* the behavior ID (reference to _how_ a task behaves)
+* the task status (see below)
+* the task result (if successful)
+* the error information (if failed)
+* the next scheduled timestamp the task shall run
+* the "recheck state" timestamp (see below)
+* the "lost at" timestamp (see below)
+
+The state of a task can be one of the following:
+
+| Status    | Meaning                                                          
            | Can transition to      |
+|-----------|------------------------------------------------------------------------------|------------------------|
+| `CREATED` | The task has been created but is not running anywhere nor did it 
run before. | `RUNNING` or deleted   |
+| `RUNNING` | The task is currently running on a node.                         
            | `SUCCESS` or `FAILURE` |
+| `SUCCESS` | The task finished successfully. Its result, if present, is 
recorded.         | `RUNNING` or deleted   |
+| `FAILURE` | The task threw an exception. Error information, if present, is 
recorded.     | `RUNNING` or deleted   |
+
+Changes to a task state happen consistently.
+
+Tasks, if eligible for being executed, have their _next scheduled timestamp_ 
attribute set. This attribute is mandatory
+for `CREATED` tasks and optional for `SUCCESS` and `FAILURE` states.
+
+Polaris nodes regularly fetch the set of tasks that are eligible to be 
executed.
+
+A task to be executed gets its status changed to `RUNNING` and the attributes 
"recheck state not before" and "lost not
+before" both holding timestamps, set.
+"Recheck state not before" is an indicator when _other_ nodes can refresh the 
state of the task.
+The value effectively says that the state is not expected to change before 
that timestamp.
+The node executing the task either regularly updates the "recheck state not 
before" (and "lost not before") timestamps
+or transitions the status to `SUCCESS` or `FAILURE`.
+These updates must happen before "recheck state not before" minus an expected 
wall-clock-drift and internal delays.
+The "lost not before" timestamp is meant to handle the case that a node that 
executes a task dies.
+Another node that encounters a task with a "lost not before" timestamp in the 
past can pick it up for execution.
+
+A task is eligible for execution if one of the following conditions is true:
+
+* `status in (CREATED, FAILURE, SUCCESS) && scheduleNotBefore is present && 
scheduleNotBefore <= now()`
+  (`scheduleNotBefore` is mandatory for `CREATED`)
+* `status == RUNNING && lostNotBefore <= now()`
+
+### Optimization for locally created tasks
+
+Implementations _may_ opt to not persist the `CREATED` status for tasks that 
shall be executed immediately, but instead
+persist the `RUNNING` state directly. This prevents other nodes to pick up the 
task.
+
+## Requirements
+
+As with probably all systems that rely on wall clock, it is essential that all 
Polaris nodes have their wall clock being
+synchronized. Some reasonable amount of wall clock drift must be considered in 
every implementation.
+
+## Code structure
+
+The code is structured into multiple modules. Consuming code should almost 
always pull in only the API module.
+
+* `polaris-tasks-api` provides the necessary Java interfaces and immutable 
types.
+* `polaris-tasks-spi` behavior implementation interfaces.
+* `polaris-tasks-store` storage-implementation, persistence agnostic.
+* `polaris-tasks-store-meta` provides the storage implementation optimized for 
JDBC et al.
+* `polaris-tasks-store-nosql` provides the storage implementation optimized 
for NoSQL.
+* `polaris-tasks-impl-base` common code for client + server.
+* `polaris-tasks-impl-server` server side implementation, can submit and 
execute tasks.
+* `polaris-tasks-impl-client` client side implementation, can only submit but 
not execute tasks.
+* `polaris-tasks-impl-remote` a "execute remotely" implementation could go 
here.
+
+## Persona
+
+### Task requestor ("caller", "user")
+
+1. Create a _task request_ that describes the task, the behavior (think: which 
implementation performs the work)
+   and parameters.
+2. Submit the _task request_, optionally subscribe to the result.
+
+### Implementer
+
+* Create a _behavior ID_
+* Define task _parameter_ value type (Jackson serializable)
+* Optional: Define task _result_ value type (Jackson serializable)
+* Implement `TaskBehavior` and the `TaskFunction`
+
+### Service instance
+
+Provides a `Tasks` implementation, which can submit tasks and execute tasks.
+
+### Client instance
+
+Provides a `Tasks` implementation, which can submit tasks but not execute 
tasks. This kind is intended for client
+application use cases, which should not be "bothered" task executions from 
other (Polaris service) instances. For
+example, clients should not "purge table" tasks from all users, despite that 
clients cannot be expected to have access
+to all service related resources.

Review Comment:
   Should this paragraph be reworded?  My English might not be the best one, I 
don't understand the "_client
   application use cases, which should not be "bothered" task executions from 
other (Polaris service) instances_" sentence.
   
   Does it mean that client applications should not _borrow_ tasks?  Or 
something else?
   
   And the sentence "_clients should not "purge table" tasks from all users, 
despite that clients cannot be expected to have access to all service related 
resources._" sounds weird with the should not/despite cannot formulation.



##########
tasks/api/src/main/java/org/apache/polaris/tasks/api/TaskBehaviorId.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.tasks.api;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.fasterxml.jackson.annotation.JsonValue;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.polaris.immutables.PolarisImmutable;
+import org.immutables.value.Value;
+
+/** Task behavior ID, as a type safe holder. */
+@PolarisImmutable
+@JsonSerialize(as = ImmutableTaskBehaviorId.class)
+@JsonDeserialize(as = ImmutableTaskBehaviorId.class)
+public interface TaskBehaviorId {

Review Comment:
   I suspect that the use of an `interface` here instead of a `record` is to 
allow extensibility, as records cannot be extended from?



##########
tasks/api/src/main/java/org/apache/polaris/tasks/api/TasksConfiguration.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.tasks.api;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import io.smallrye.config.ConfigMapping;
+import io.smallrye.config.WithDefault;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.OptionalInt;
+import org.apache.polaris.immutables.PolarisImmutable;
+
+/** Advanced configuration options for distributed and converged task 
handling. */
+@ConfigMapping(prefix = "polaris.coordinated-tasks")
+@PolarisImmutable
+@JsonSerialize(as = ImmutableTasksConfiguration.class)
+@JsonDeserialize(as = ImmutableTasksConfiguration.class)
+@JsonTypeName(TasksConfiguration.TYPE_ID)
+public interface TasksConfiguration {
+  String TYPE_ID = "coordinated-tasks";
+
+  int DEFAULT_RETAINED_HISTORY_LIMIT = 5;
+
+  /** Number of history entries for task scheduling/submission updates. */
+  @WithDefault("" + DEFAULT_RETAINED_HISTORY_LIMIT)
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  OptionalInt retainedHistoryLimit();
+
+  String DEFAULT_UPDATE_STATE_INTERVAL_STRING = "PT1S";
+  Duration DEFAULT_UPDATE_STATE_INTERVAL = 
Duration.parse(DEFAULT_UPDATE_STATE_INTERVAL_STRING);
+
+  /**
+   * The state of running tasks is regularly updated in the task store. This 
parameter defines the
+   * interval of these updates.
+   */
+  @WithDefault(DEFAULT_UPDATE_STATE_INTERVAL_STRING)
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  @JsonFormat(shape = JsonFormat.Shape.STRING)
+  Optional<Duration> updateStateInterval();
+
+  String DEFAULT_LOST_NOT_BEFORE_STRING = "PT5M";
+  Duration DEFAULT_LOST_NOT_BEFORE = 
Duration.parse(DEFAULT_LOST_NOT_BEFORE_STRING);
+
+  /**
+   * Tasks in status "running" are considered as "lost" when the last 
state-update was before the
+   * duration specified by this parameter.
+   *
+   * <p>Running tasks regularly update the task store with their current state 
(see {@code
+   * updateStateInterval}). Tasks states with a {@code lostNotBefore} in the 
past are considered as
+   * "dead", which means that the task's execution can be resumed by another 
node.
+   *
+   * <p>This value should be large enough and quite a multiple of {@code 
updateStateInterval}, in
+   * the range of 1 or more minutes.
+   */
+  @WithDefault(DEFAULT_LOST_NOT_BEFORE_STRING)
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  @JsonFormat(shape = JsonFormat.Shape.STRING)
+  Optional<Duration> lostNotBeforeDuration();
+
+  String DEFAULT_RUNNABLE_TASKS_POLL_INTERVAL_STRING = "PT5S";
+  Duration DEFAULT_RUNNABLE_TASKS_POLL_INTERVAL =
+      Duration.parse(DEFAULT_RUNNABLE_TASKS_POLL_INTERVAL_STRING);
+
+  /** Interval at which nodes check for scheduled tasks that eligible to run. 
*/
+  @WithDefault(DEFAULT_RUNNABLE_TASKS_POLL_INTERVAL_STRING)
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  @JsonFormat(shape = JsonFormat.Shape.STRING)
+  Optional<Duration> runnableTasksPollInterval();
+
+  String DEFAULT_REMOTE_TASK_POLL_INTERVAL_STRING = "PT0.5S";
+  Duration DEFAULT_REMOTE_TASK_POLL_INTERVAL =
+      Duration.parse(DEFAULT_REMOTE_TASK_POLL_INTERVAL_STRING);
+
+  /**
+   * The interval at which the state of a task that is running on another node 
is being polled.

Review Comment:
   ```suggestion
      * The interval at which the state of a task running on another node is 
polled.
   ```



##########
tasks/api/src/main/java/org/apache/polaris/tasks/api/TasksConfiguration.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.tasks.api;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import io.smallrye.config.ConfigMapping;
+import io.smallrye.config.WithDefault;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.OptionalInt;
+import org.apache.polaris.immutables.PolarisImmutable;
+
+/** Advanced configuration options for distributed and converged task 
handling. */
+@ConfigMapping(prefix = "polaris.coordinated-tasks")
+@PolarisImmutable
+@JsonSerialize(as = ImmutableTasksConfiguration.class)
+@JsonDeserialize(as = ImmutableTasksConfiguration.class)
+@JsonTypeName(TasksConfiguration.TYPE_ID)
+public interface TasksConfiguration {
+  String TYPE_ID = "coordinated-tasks";
+
+  int DEFAULT_RETAINED_HISTORY_LIMIT = 5;
+
+  /** Number of history entries for task scheduling/submission updates. */
+  @WithDefault("" + DEFAULT_RETAINED_HISTORY_LIMIT)
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  OptionalInt retainedHistoryLimit();
+
+  String DEFAULT_UPDATE_STATE_INTERVAL_STRING = "PT1S";
+  Duration DEFAULT_UPDATE_STATE_INTERVAL = 
Duration.parse(DEFAULT_UPDATE_STATE_INTERVAL_STRING);
+
+  /**
+   * The state of running tasks is regularly updated in the task store. This 
parameter defines the
+   * interval of these updates.
+   */
+  @WithDefault(DEFAULT_UPDATE_STATE_INTERVAL_STRING)
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  @JsonFormat(shape = JsonFormat.Shape.STRING)
+  Optional<Duration> updateStateInterval();
+
+  String DEFAULT_LOST_NOT_BEFORE_STRING = "PT5M";
+  Duration DEFAULT_LOST_NOT_BEFORE = 
Duration.parse(DEFAULT_LOST_NOT_BEFORE_STRING);
+
+  /**
+   * Tasks in status "running" are considered as "lost" when the last 
state-update was before the
+   * duration specified by this parameter.
+   *
+   * <p>Running tasks regularly update the task store with their current state 
(see {@code
+   * updateStateInterval}). Tasks states with a {@code lostNotBefore} in the 
past are considered as
+   * "dead", which means that the task's execution can be resumed by another 
node.
+   *
+   * <p>This value should be large enough and quite a multiple of {@code 
updateStateInterval}, in
+   * the range of 1 or more minutes.
+   */
+  @WithDefault(DEFAULT_LOST_NOT_BEFORE_STRING)
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  @JsonFormat(shape = JsonFormat.Shape.STRING)
+  Optional<Duration> lostNotBeforeDuration();
+
+  String DEFAULT_RUNNABLE_TASKS_POLL_INTERVAL_STRING = "PT5S";
+  Duration DEFAULT_RUNNABLE_TASKS_POLL_INTERVAL =
+      Duration.parse(DEFAULT_RUNNABLE_TASKS_POLL_INTERVAL_STRING);
+
+  /** Interval at which nodes check for scheduled tasks that eligible to run. 
*/
+  @WithDefault(DEFAULT_RUNNABLE_TASKS_POLL_INTERVAL_STRING)
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  @JsonFormat(shape = JsonFormat.Shape.STRING)
+  Optional<Duration> runnableTasksPollInterval();
+
+  String DEFAULT_REMOTE_TASK_POLL_INTERVAL_STRING = "PT0.5S";
+  Duration DEFAULT_REMOTE_TASK_POLL_INTERVAL =
+      Duration.parse(DEFAULT_REMOTE_TASK_POLL_INTERVAL_STRING);
+
+  /**
+   * The interval at which the state of a task that is running on another node 
is being polled.
+   *
+   * <p>Call sites that subscribe to the result of a task, which runs on a 
different node, use this

Review Comment:
   ```suggestion
      * <p>Call sites that subscribe to the result of a task running on a 
different node use this
   ```



##########
tasks/spi/src/main/java/org/apache/polaris/tasks/spi/TaskBehavior.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.tasks.spi;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import java.time.Instant;
+import java.util.Optional;
+import org.apache.polaris.tasks.api.TaskBehaviorId;
+import org.apache.polaris.tasks.api.TaskId;
+import org.apache.polaris.tasks.api.TaskParameter;
+import org.apache.polaris.tasks.api.TaskResult;
+import org.apache.polaris.tasks.api.Tasks;
+
+/**
+ * Task behaviors provide/define how tasks behave and how tasks are handled.
+ *
+ * <p>Implementations are provided as {@link ApplicationScoped 
@ApplicationScoped} beans.
+ *
+ * <p>Each behavior must define its {@linkplain TaskParameter input parameter} 
and {@linkplain
+ * TaskResult result} types.
+ */
+public interface TaskBehavior<PARAM extends TaskParameter, RESULT extends 
TaskResult> {
+  /** Human-readable name. */
+  String name();
+
+  /** Globally unique ID of the task behavior. */
+  TaskBehaviorId id();
+
+  Class<PARAM> paramType();
+
+  Class<RESULT> resultType();
+
+  /**
+   * Provide a task-runnable that can perform the task behavior's operation.
+   *
+   * <p>No guarantees are made about whether CDI is available and which scope 
is active.
+   *
+   * <p>Implementations must <em>never</em> assume that any values or context 
information from a
+   * "scheduling" context (think: CDI request context, even propagated) is 
available. This is
+   * neither supported by the CDI specification nor practically doable, 
especially considering that
+   * task functions are executed "far" in the future and/or on a different 
node.
+   */
+  TaskFunction<PARAM, RESULT> function();
+
+  /**
+   * Generate a task ID.

Review Comment:
   ```suggestion
      * Generates a task ID.
   ```



##########
tasks/api/src/main/java/org/apache/polaris/tasks/api/TaskParameter.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.tasks.api;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * Base interface for task parameter objects. This is the necessary base type 
for Jackson type
+ * polymorphism.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME)
+public interface TaskParameter {

Review Comment:
   When I first reviewed this PR, I had the impression that this class was 
representing a single parameter of a given task.  And that, if a task had 
multiple parameters, it would have multiple `TaskParameter` objects.
   
   Now, I am thinking that each task behaviour can have only one 
`TaskParameter` and that, despite the singular name of the `TaskParameter` 
class, it holds *all* the parameters of a given task.  I.e. it is the JSON root 
of an object containing all parameters.  If so, would it make sense updating 
the documentation to reflect that?  



##########
tasks/api/src/main/java/org/apache/polaris/tasks/api/TaskParameterTypeId.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.tasks.api;
+
+import static java.lang.annotation.ElementType.TYPE;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import jakarta.annotation.Nonnull;
+import java.lang.annotation.Documented;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+/**
+ * Provide the {@linkplain TaskBehaviorId task behavior ID} using the

Review Comment:
   ```suggestion
    * Provides the {@linkplain TaskBehaviorId task behavior ID} using the
   ```
   
   Is this sentence correct?  This javadoc is identical to that of 
`TaskResultTypeId`, but neither class uses nor are used by the `TaskBehaviorId` 
class at all.



##########
tasks/api/src/main/java/org/apache/polaris/tasks/api/TaskSerializationResolver.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.tasks.api;
+
+import jakarta.annotation.Nonnull;
+import java.util.Optional;
+
+/**
+ * Interface needed by {@link TaskParameterTypeIdResolver} and {@link 
TaskParameterTypeIdResolver}
+ * to resolve type-IDs to Java types. This is used in/with CDI.
+ */
+public interface TaskSerializationResolver {
+
+  /**
+   * Resolve the task parameter type using the ID defined in {@link 
TaskParameterTypeId#value()}.

Review Comment:
   ```suggestion
      * Resolves the task parameter type using the ID defined in {@link 
TaskParameterTypeId#value()}.
      *
      * @param parameterTypeId the task parameter type ID
      * @return the task parameter class, or an empty optional if not found
   ```



##########
tasks/api/src/main/java/org/apache/polaris/tasks/api/Tasks.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.tasks.api;
+
+/**
+ * User-facing task submission API.
+ *
+ * <p>This interface intentionally only provides a function to submit tasks.
+ */
+public interface Tasks {
+  /**
+   * Submit the given task request for execution.
+   *
+   * <p>This API only guarantees that, upon successful execution, the task 
request will eventually
+   * be executed. When exactly and where exactly, think: system process, the 
task will be executed
+   * depends on configuration specifics and task state.

Review Comment:
   Multi-line suggestion for clarity:
   
   ```
      * <p>This API only guarantees that, upon successful return from this 
method, the task request
      * will eventually be executed. The exact timing and location (i.e., which 
system process) of the
      * task execution depends on configuration specifics and task state.
   ```



##########
tasks/api/src/main/java/org/apache/polaris/tasks/api/Tasks.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.tasks.api;
+
+/**
+ * User-facing task submission API.
+ *
+ * <p>This interface intentionally only provides a function to submit tasks.
+ */
+public interface Tasks {
+  /**
+   * Submit the given task request for execution.

Review Comment:
   ```suggestion
      * Submits the given task request for execution.
   ```



##########
tasks/api/src/main/java/org/apache/polaris/tasks/api/TaskSubmission.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.tasks.api;
+
+import java.util.concurrent.CompletionStage;
+
+/**
+ * Represents the result of a {@linkplain Tasks#submit(TaskBehaviorId, String, 
TaskParameter, Class)
+ * task submission}.
+ *
+ * @param <RESULT> task execution result type.
+ */
+public interface TaskSubmission<RESULT> extends AutoCloseable {
+  TaskId taskId();
+
+  /**
+   * Use this completion stage to subscribe to the result of a task execution, 
do not use this when
+   * the task execution is about to happen "far in the future" or is a 
recurring task.
+   *
+   * <p>It only makes sense to subscribe to a task execution of a 
non-repeating task. The behavior
+   * of the returned completion stage is undefined for re-scheduled tasks. 
Implementations may
+   * return the result of the "current"/next task run or not yield a result as 
long as the task gets
+   * rescheduled.
+   */
+  CompletionStage<RESULT> completionStage();

Review Comment:
   Suggestion (multi-line so I cannot offer that directly in Github):
   
   ```
     /**
      * Returns a completion stage to subscribe to the result of a task 
execution.
      *
      * <p>This should only be used for tasks that will execute in the near 
future. Do not use this for
      * tasks scheduled far in the future or for recurring tasks.
      *
      * <p>It only makes sense to subscribe to the execution of a non-repeating 
task. The behavior of
      * the returned completion stage is undefined for rescheduled tasks. 
Implementations may return
      * the result of the current or next task run, or may not yield a result 
as long as the task gets
      * rescheduled.
      */
   ```
   



##########
tasks/api/src/main/java/org/apache/polaris/tasks/api/TasksConfiguration.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.tasks.api;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import io.smallrye.config.ConfigMapping;
+import io.smallrye.config.WithDefault;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.OptionalInt;
+import org.apache.polaris.immutables.PolarisImmutable;
+
+/** Advanced configuration options for distributed and converged task 
handling. */
+@ConfigMapping(prefix = "polaris.coordinated-tasks")
+@PolarisImmutable
+@JsonSerialize(as = ImmutableTasksConfiguration.class)
+@JsonDeserialize(as = ImmutableTasksConfiguration.class)
+@JsonTypeName(TasksConfiguration.TYPE_ID)
+public interface TasksConfiguration {
+  String TYPE_ID = "coordinated-tasks";
+
+  int DEFAULT_RETAINED_HISTORY_LIMIT = 5;
+
+  /** Number of history entries for task scheduling/submission updates. */
+  @WithDefault("" + DEFAULT_RETAINED_HISTORY_LIMIT)

Review Comment:
   What is the rationale for `DEFAULT_RETAINED_HISTORY_LIMIT` being an `int` 
instead of a `String`, and then converted to a string using a `"" + ` 
concatenation?  



##########
tasks/api/src/main/java/org/apache/polaris/tasks/api/Tasks.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.tasks.api;
+
+/**
+ * User-facing task submission API.
+ *
+ * <p>This interface intentionally only provides a function to submit tasks.
+ */
+public interface Tasks {
+  /**
+   * Submit the given task request for execution.
+   *
+   * <p>This API only guarantees that, upon successful execution, the task 
request will eventually
+   * be executed. When exactly and where exactly, think: system process, the 
task will be executed
+   * depends on configuration specifics and task state.
+   *
+   * <p>Task executions are uniquely identified by their {@link TaskId}, which 
are generated by the
+   * task behavior implementation. Task behavior implementations produce 
either new, unique task IDs
+   * per submission or deterministic task IDs based on the task parameters.
+   *
+   * <p>Task behavior implementations may request an initial delay of a task 
execution and also
+   * define optional retry behaviors for successful and failed executions.
+   *
+   * <p>Once this function returns successfully, the task execution request is 
durably persisted.
+   *
+   * <p>Implementations provide a best-effort approach, offering at-least-once 
semantics.
+   *
+   * <p>This API provides no guarantee when a task will be executed. 
Implementations may defer
+   * execution to any point in time in the future.
+   *
+   * <p>This API offers no task priorities. When tasks are executed is 
determined by the
+   * implementation.
+   *
+   * @param <PARAM> task input parameter
+   * @param <RESULT> task execution result, if successful
+   * @param behaviorId ID of task behavior being submitted. An {@link 
IllegalArgumentException} is
+   *     thrown if no behavior for this ID is registered.
+   * @param realmId realm ID for which the task is submitted
+   * @param param task parameter value, task behavior implementations derive 
the actual task ID
+   *     based on their implementation and potentially from this task 
parameter value. The caller is
+   *     responsible for providing a correct, compatible parameter value for 
the given task
+   *     behavior.
+   * @param resultType type of the result value object. The caller is 
responsible for providing the
+   *     correct, compatible result type for the given task behavior.
+   * @return asynchronous execution result

Review Comment:
   Multi-line suggestion (distinguish the `<PARAM>` and `<RESULT>` as types.
   
   ```
      * @param <PARAM> task input parameter type
      * @param <RESULT> task execution result type
      * @param behaviorId ID of task behavior being submitted. An {@link 
IllegalArgumentException} is
      *     thrown if no behavior for this ID is registered.
      * @param realmId realm ID for which the task is submitted
      * @param param task parameter value. Task behavior implementations derive 
the actual task ID
      *     based on their implementation and potentially from this task 
parameter value. The caller is
      *     responsible for providing a correct, compatible parameter value for 
the given task
      *     behavior.
      * @param resultType type of the result value object. The caller is 
responsible for providing the
      *     correct, compatible result type for the given task behavior.
      * @return the task submission handle containing the task ID
   ```



##########
tasks/api/src/main/java/org/apache/polaris/tasks/api/TasksConfiguration.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.tasks.api;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import io.smallrye.config.ConfigMapping;
+import io.smallrye.config.WithDefault;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.OptionalInt;
+import org.apache.polaris.immutables.PolarisImmutable;
+
+/** Advanced configuration options for distributed and converged task 
handling. */
+@ConfigMapping(prefix = "polaris.coordinated-tasks")
+@PolarisImmutable
+@JsonSerialize(as = ImmutableTasksConfiguration.class)
+@JsonDeserialize(as = ImmutableTasksConfiguration.class)
+@JsonTypeName(TasksConfiguration.TYPE_ID)
+public interface TasksConfiguration {
+  String TYPE_ID = "coordinated-tasks";
+
+  int DEFAULT_RETAINED_HISTORY_LIMIT = 5;
+
+  /** Number of history entries for task scheduling/submission updates. */
+  @WithDefault("" + DEFAULT_RETAINED_HISTORY_LIMIT)
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  OptionalInt retainedHistoryLimit();
+
+  String DEFAULT_UPDATE_STATE_INTERVAL_STRING = "PT1S";
+  Duration DEFAULT_UPDATE_STATE_INTERVAL = 
Duration.parse(DEFAULT_UPDATE_STATE_INTERVAL_STRING);
+
+  /**
+   * The state of running tasks is regularly updated in the task store. This 
parameter defines the
+   * interval of these updates.
+   */
+  @WithDefault(DEFAULT_UPDATE_STATE_INTERVAL_STRING)
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  @JsonFormat(shape = JsonFormat.Shape.STRING)
+  Optional<Duration> updateStateInterval();
+
+  String DEFAULT_LOST_NOT_BEFORE_STRING = "PT5M";
+  Duration DEFAULT_LOST_NOT_BEFORE = 
Duration.parse(DEFAULT_LOST_NOT_BEFORE_STRING);
+
+  /**
+   * Tasks in status "running" are considered as "lost" when the last 
state-update was before the
+   * duration specified by this parameter.
+   *
+   * <p>Running tasks regularly update the task store with their current state 
(see {@code
+   * updateStateInterval}). Tasks states with a {@code lostNotBefore} in the 
past are considered as
+   * "dead", which means that the task's execution can be resumed by another 
node.
+   *
+   * <p>This value should be large enough and quite a multiple of {@code 
updateStateInterval}, in
+   * the range of 1 or more minutes.
+   */
+  @WithDefault(DEFAULT_LOST_NOT_BEFORE_STRING)
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  @JsonFormat(shape = JsonFormat.Shape.STRING)
+  Optional<Duration> lostNotBeforeDuration();
+
+  String DEFAULT_RUNNABLE_TASKS_POLL_INTERVAL_STRING = "PT5S";
+  Duration DEFAULT_RUNNABLE_TASKS_POLL_INTERVAL =
+      Duration.parse(DEFAULT_RUNNABLE_TASKS_POLL_INTERVAL_STRING);
+
+  /** Interval at which nodes check for scheduled tasks that eligible to run. 
*/
+  @WithDefault(DEFAULT_RUNNABLE_TASKS_POLL_INTERVAL_STRING)
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  @JsonFormat(shape = JsonFormat.Shape.STRING)
+  Optional<Duration> runnableTasksPollInterval();
+
+  String DEFAULT_REMOTE_TASK_POLL_INTERVAL_STRING = "PT0.5S";
+  Duration DEFAULT_REMOTE_TASK_POLL_INTERVAL =
+      Duration.parse(DEFAULT_REMOTE_TASK_POLL_INTERVAL_STRING);
+
+  /**
+   * The interval at which the state of a task that is running on another node 
is being polled.
+   *
+   * <p>Call sites that subscribe to the result of a task, which runs on a 
different node, use this
+   * value as the polling interval.
+   *
+   * <p>This parameter should be lower than the duration defined by the 
parameter {@code
+   * updateStateInterval}.
+   */
+  @WithDefault(DEFAULT_REMOTE_TASK_POLL_INTERVAL_STRING)
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  @JsonFormat(shape = JsonFormat.Shape.STRING)
+  Optional<Duration> remoteTaskPollInterval();
+
+  String DEFAULT_LOCAL_SCHEDULING_INTERVAL_STRING = "PT2S";
+  Duration DEFAULT_LOCAL_SCHEDULING_INTERVAL =
+      Duration.parse(DEFAULT_LOCAL_SCHEDULING_INTERVAL_STRING);
+
+  /**
+   * New submitted that are scheduled with this interval will be scheduled for 
non-local execution,

Review Comment:
   ```suggestion
      * Newly submitted tasks that are scheduled with this interval will be 
scheduled for non-local execution,
   ```
   
   I am a bit puzzled by this parameter.  I could not find any description of a 
guarantee for non-local execution in the readme.  And given that the 
implementations will be submitted in subsequent PRs, it is difficult for me to 
grasp the concept described here.  Should either the javadoc or the readme be 
updated to provide more information?



##########
tasks/api/src/main/java/org/apache/polaris/tasks/api/TaskResultTypeId.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.tasks.api;
+
+import static java.lang.annotation.ElementType.TYPE;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import jakarta.annotation.Nonnull;
+import java.lang.annotation.Documented;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+/**
+ * Provide the {@linkplain TaskBehaviorId task behavior ID} using the

Review Comment:
   ```suggestion
    * Provides the {@linkplain TaskBehaviorId task behavior ID} using the
   ```
   
   Same question as before: Is this sentence correct?  This javadoc is 
identical to that of `TaskParameterTypeId`, but neither class touches the 
`TaskBehaviorId` class at all.



##########
tasks/api/src/main/java/org/apache/polaris/tasks/api/TaskSerializationResolver.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.tasks.api;
+
+import jakarta.annotation.Nonnull;
+import java.util.Optional;
+
+/**
+ * Interface needed by {@link TaskParameterTypeIdResolver} and {@link 
TaskParameterTypeIdResolver}
+ * to resolve type-IDs to Java types. This is used in/with CDI.
+ */
+public interface TaskSerializationResolver {
+
+  /**
+   * Resolve the task parameter type using the ID defined in {@link 
TaskParameterTypeId#value()}.
+   */
+  Optional<Class<? extends TaskParameter>> taskParameterType(@Nonnull String 
parameterTypeId);
+
+  /** Resolve the task result type using the ID defined in {@link 
TaskResultTypeId#value()}. */

Review Comment:
   ```suggestion
     /**
      * Resolves the task result type using the ID defined in {@link 
TaskResultTypeId#value()}.
      *
      * @param resultTypeId the task result type ID
      * @return the task result class, or an empty optional if not found
      */
   ```



##########
tasks/api/src/main/java/org/apache/polaris/tasks/api/TasksConfiguration.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.tasks.api;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import io.smallrye.config.ConfigMapping;
+import io.smallrye.config.WithDefault;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.OptionalInt;
+import org.apache.polaris.immutables.PolarisImmutable;
+
+/** Advanced configuration options for distributed and converged task 
handling. */
+@ConfigMapping(prefix = "polaris.coordinated-tasks")
+@PolarisImmutable
+@JsonSerialize(as = ImmutableTasksConfiguration.class)
+@JsonDeserialize(as = ImmutableTasksConfiguration.class)
+@JsonTypeName(TasksConfiguration.TYPE_ID)
+public interface TasksConfiguration {
+  String TYPE_ID = "coordinated-tasks";
+
+  int DEFAULT_RETAINED_HISTORY_LIMIT = 5;
+
+  /** Number of history entries for task scheduling/submission updates. */
+  @WithDefault("" + DEFAULT_RETAINED_HISTORY_LIMIT)
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  OptionalInt retainedHistoryLimit();
+
+  String DEFAULT_UPDATE_STATE_INTERVAL_STRING = "PT1S";
+  Duration DEFAULT_UPDATE_STATE_INTERVAL = 
Duration.parse(DEFAULT_UPDATE_STATE_INTERVAL_STRING);
+
+  /**
+   * The state of running tasks is regularly updated in the task store. This 
parameter defines the
+   * interval of these updates.
+   */
+  @WithDefault(DEFAULT_UPDATE_STATE_INTERVAL_STRING)
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  @JsonFormat(shape = JsonFormat.Shape.STRING)
+  Optional<Duration> updateStateInterval();
+
+  String DEFAULT_LOST_NOT_BEFORE_STRING = "PT5M";
+  Duration DEFAULT_LOST_NOT_BEFORE = 
Duration.parse(DEFAULT_LOST_NOT_BEFORE_STRING);
+
+  /**
+   * Tasks in status "running" are considered as "lost" when the last 
state-update was before the

Review Comment:
   ```suggestion
      * Tasks in "running" status are considered "lost" when the last 
state-update was before the
   ```



##########
tasks/api/src/main/java/org/apache/polaris/tasks/api/TasksConfiguration.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.tasks.api;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import io.smallrye.config.ConfigMapping;
+import io.smallrye.config.WithDefault;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.OptionalInt;
+import org.apache.polaris.immutables.PolarisImmutable;
+
+/** Advanced configuration options for distributed and converged task 
handling. */
+@ConfigMapping(prefix = "polaris.coordinated-tasks")
+@PolarisImmutable
+@JsonSerialize(as = ImmutableTasksConfiguration.class)
+@JsonDeserialize(as = ImmutableTasksConfiguration.class)
+@JsonTypeName(TasksConfiguration.TYPE_ID)
+public interface TasksConfiguration {
+  String TYPE_ID = "coordinated-tasks";
+
+  int DEFAULT_RETAINED_HISTORY_LIMIT = 5;
+
+  /** Number of history entries for task scheduling/submission updates. */
+  @WithDefault("" + DEFAULT_RETAINED_HISTORY_LIMIT)
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  OptionalInt retainedHistoryLimit();
+
+  String DEFAULT_UPDATE_STATE_INTERVAL_STRING = "PT1S";
+  Duration DEFAULT_UPDATE_STATE_INTERVAL = 
Duration.parse(DEFAULT_UPDATE_STATE_INTERVAL_STRING);
+
+  /**
+   * The state of running tasks is regularly updated in the task store. This 
parameter defines the
+   * interval of these updates.
+   */
+  @WithDefault(DEFAULT_UPDATE_STATE_INTERVAL_STRING)
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  @JsonFormat(shape = JsonFormat.Shape.STRING)
+  Optional<Duration> updateStateInterval();
+
+  String DEFAULT_LOST_NOT_BEFORE_STRING = "PT5M";
+  Duration DEFAULT_LOST_NOT_BEFORE = 
Duration.parse(DEFAULT_LOST_NOT_BEFORE_STRING);
+
+  /**
+   * Tasks in status "running" are considered as "lost" when the last 
state-update was before the
+   * duration specified by this parameter.
+   *
+   * <p>Running tasks regularly update the task store with their current state 
(see {@code
+   * updateStateInterval}). Tasks states with a {@code lostNotBefore} in the 
past are considered as
+   * "dead", which means that the task's execution can be resumed by another 
node.
+   *
+   * <p>This value should be large enough and quite a multiple of {@code 
updateStateInterval}, in
+   * the range of 1 or more minutes.
+   */
+  @WithDefault(DEFAULT_LOST_NOT_BEFORE_STRING)
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  @JsonFormat(shape = JsonFormat.Shape.STRING)
+  Optional<Duration> lostNotBeforeDuration();
+
+  String DEFAULT_RUNNABLE_TASKS_POLL_INTERVAL_STRING = "PT5S";
+  Duration DEFAULT_RUNNABLE_TASKS_POLL_INTERVAL =
+      Duration.parse(DEFAULT_RUNNABLE_TASKS_POLL_INTERVAL_STRING);
+
+  /** Interval at which nodes check for scheduled tasks that eligible to run. 
*/

Review Comment:
   ```suggestion
     /** Interval at which nodes check for scheduled tasks that are eligible to 
run. */
   ```



##########
tasks/spi/src/main/java/org/apache/polaris/tasks/spi/TaskBehavior.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.tasks.spi;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import java.time.Instant;
+import java.util.Optional;
+import org.apache.polaris.tasks.api.TaskBehaviorId;
+import org.apache.polaris.tasks.api.TaskId;
+import org.apache.polaris.tasks.api.TaskParameter;
+import org.apache.polaris.tasks.api.TaskResult;
+import org.apache.polaris.tasks.api.Tasks;
+
+/**
+ * Task behaviors provide/define how tasks behave and how tasks are handled.
+ *
+ * <p>Implementations are provided as {@link ApplicationScoped 
@ApplicationScoped} beans.
+ *
+ * <p>Each behavior must define its {@linkplain TaskParameter input parameter} 
and {@linkplain
+ * TaskResult result} types.
+ */
+public interface TaskBehavior<PARAM extends TaskParameter, RESULT extends 
TaskResult> {
+  /** Human-readable name. */
+  String name();
+
+  /** Globally unique ID of the task behavior. */
+  TaskBehaviorId id();
+
+  Class<PARAM> paramType();
+
+  Class<RESULT> resultType();
+
+  /**
+   * Provide a task-runnable that can perform the task behavior's operation.
+   *
+   * <p>No guarantees are made about whether CDI is available and which scope 
is active.
+   *
+   * <p>Implementations must <em>never</em> assume that any values or context 
information from a
+   * "scheduling" context (think: CDI request context, even propagated) is 
available. This is
+   * neither supported by the CDI specification nor practically doable, 
especially considering that
+   * task functions are executed "far" in the future and/or on a different 
node.
+   */
+  TaskFunction<PARAM, RESULT> function();
+
+  /**
+   * Generate a task ID.
+   *
+   * <p>Task behavior implementations produce either new, globally unique task 
IDs per submission or
+   * deterministic task IDs based on the task parameters.
+   *
+   * <p>Implementations must include the given {@code realmId} when generating 
or calculating a task
+   * ID.

Review Comment:
   Is this true 100% of the time?  Or could there be some cross-realm tasks 
(like Polaris overall cleanup tasks) that would be realm-independent?  In which 
case, `must` should be replaced by `should`



##########
tasks/api/src/main/java/org/apache/polaris/tasks/api/TasksConfiguration.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.tasks.api;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import io.smallrye.config.ConfigMapping;
+import io.smallrye.config.WithDefault;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.OptionalInt;
+import org.apache.polaris.immutables.PolarisImmutable;
+
+/** Advanced configuration options for distributed and converged task 
handling. */
+@ConfigMapping(prefix = "polaris.coordinated-tasks")
+@PolarisImmutable
+@JsonSerialize(as = ImmutableTasksConfiguration.class)
+@JsonDeserialize(as = ImmutableTasksConfiguration.class)
+@JsonTypeName(TasksConfiguration.TYPE_ID)
+public interface TasksConfiguration {
+  String TYPE_ID = "coordinated-tasks";
+
+  int DEFAULT_RETAINED_HISTORY_LIMIT = 5;
+
+  /** Number of history entries for task scheduling/submission updates. */
+  @WithDefault("" + DEFAULT_RETAINED_HISTORY_LIMIT)
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  OptionalInt retainedHistoryLimit();
+
+  String DEFAULT_UPDATE_STATE_INTERVAL_STRING = "PT1S";
+  Duration DEFAULT_UPDATE_STATE_INTERVAL = 
Duration.parse(DEFAULT_UPDATE_STATE_INTERVAL_STRING);
+
+  /**
+   * The state of running tasks is regularly updated in the task store. This 
parameter defines the
+   * interval of these updates.
+   */
+  @WithDefault(DEFAULT_UPDATE_STATE_INTERVAL_STRING)
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  @JsonFormat(shape = JsonFormat.Shape.STRING)
+  Optional<Duration> updateStateInterval();
+
+  String DEFAULT_LOST_NOT_BEFORE_STRING = "PT5M";
+  Duration DEFAULT_LOST_NOT_BEFORE = 
Duration.parse(DEFAULT_LOST_NOT_BEFORE_STRING);
+
+  /**
+   * Tasks in status "running" are considered as "lost" when the last 
state-update was before the
+   * duration specified by this parameter.
+   *
+   * <p>Running tasks regularly update the task store with their current state 
(see {@code
+   * updateStateInterval}). Tasks states with a {@code lostNotBefore} in the 
past are considered as
+   * "dead", which means that the task's execution can be resumed by another 
node.

Review Comment:
   Suggestion: `Tasks states` -> `Tasks state`
   
   Also, AFAIU we do not have the ability to **resume** a task execution 
(think: from a checkpoint, à-la MapReduce).  Instead, we have the ability to 
**retry** a task execution, with the at-least-once semantics.  Should the 
`resumed` word be replaced?



##########
tasks/store/src/main/java/org/apache/polaris/tasks/store/TaskStore.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.tasks.store;
+
+import jakarta.annotation.Nonnull;
+import java.time.Instant;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.function.Predicate;
+import org.apache.polaris.tasks.api.TaskId;
+import org.apache.polaris.tasks.api.Tasks;
+
+/** Persistence interface for {@link Tasks} implementations. */
+public interface TaskStore {
+  /**
+   * Used to generate a new, unique ID.
+   *
+   * @param realmId current realm ID
+   * @return new, unique ID
+   */
+  long generateId(String realmId);
+
+  /**
+   * Retrieve tasks with a {@linkplain TaskState#scheduleNotBefore() schedule 
timestamp} up to
+   * (including) {@code now}.
+   *
+   * <p>While it would be programmatically easier to return a {@link 
java.util.stream.Stream} from
+   * this function, letting it return a {@link List} helps keeping (backend 
database) resource usage
+   * enclosed in the implementation.
+   *
+   * @param now maximum schedule-not-before timestamp. The implementation 
filters out entries that
+   *     have no or a higher schedule-not-before timestamp before calling the 
{@code filter}, which
+   *     allows pushing down this value as a predicate to the persistence 
layer.
+   * @param filter filter to be matched, only entries for which this predicate 
yields {@code true}
+   *     are returned.
+   * @param limit maximum number of entries to return, empty means unlimited. 
Only eligible tasks
+   *     that match the maximum schedule-not-before timestamp and pass the 
filter test are counted
+   *     towards this value.
+   * @return matching tasks, the order of elements within the returned list is 
undefined

Review Comment:
   nit: given that `List` allows index-based access, and the order of elements 
is undefined, it might make sense to return a `Collection` instead of a `List`.



##########
tasks/store/src/main/java/org/apache/polaris/tasks/store/TaskStore.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.tasks.store;
+
+import jakarta.annotation.Nonnull;
+import java.time.Instant;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.function.Predicate;
+import org.apache.polaris.tasks.api.TaskId;
+import org.apache.polaris.tasks.api.Tasks;
+
+/** Persistence interface for {@link Tasks} implementations. */
+public interface TaskStore {
+  /**
+   * Used to generate a new, unique ID.
+   *
+   * @param realmId current realm ID
+   * @return new, unique ID
+   */
+  long generateId(String realmId);
+
+  /**
+   * Retrieve tasks with a {@linkplain TaskState#scheduleNotBefore() schedule 
timestamp} up to
+   * (including) {@code now}.
+   *
+   * <p>While it would be programmatically easier to return a {@link 
java.util.stream.Stream} from
+   * this function, letting it return a {@link List} helps keeping (backend 
database) resource usage
+   * enclosed in the implementation.
+   *
+   * @param now maximum schedule-not-before timestamp. The implementation 
filters out entries that
+   *     have no or a higher schedule-not-before timestamp before calling the 
{@code filter}, which
+   *     allows pushing down this value as a predicate to the persistence 
layer.
+   * @param filter filter to be matched, only entries for which this predicate 
yields {@code true}
+   *     are returned.
+   * @param limit maximum number of entries to return, empty means unlimited. 
Only eligible tasks
+   *     that match the maximum schedule-not-before timestamp and pass the 
filter test are counted
+   *     towards this value.
+   * @return matching tasks, the order of elements within the returned list is 
undefined
+   */
+  @Nonnull
+  List<TaskStoreResult> scheduledTasks(
+      @Nonnull Instant now, @Nonnull Predicate<TaskStoreResult> filter, 
OptionalInt limit);
+
+  /**
+   * Retrieve the {@linkplain TaskState task state} and the {@linkplain 
TaskHandle task handle} to
+   * it for the given ID.
+   *
+   * <p>Tasks that have reached a final state may not be returned.
+   *
+   * @throws IllegalArgumentException if the task does not exist
+   */
+  Optional<TaskStoreResult> fetchTask(@Nonnull TaskId taskId);
+
+  /**
+   * Fetch the task state for the given {@linkplain TaskHandle task handle}.
+   *
+   * @throws IllegalArgumentException if the task does not exist
+   */
+  Optional<TaskState> fetchTaskState(@Nonnull TaskHandle taskHandle);
+
+  /**
+   * Update the task state for the given ID.
+   *
+   * <p>The {@code updater} function might be called multiple times, for 
example, when
+   * commit-retries happen. Therefore {@code updater} function must be free of 
side effects.
+   *
+   * @param updater Update function must be idempotent and expect to be 
invoked multiple times. Gets
+   *     the current state as its input may throw

Review Comment:
   ```
      * @param updater the update function, which must be idempotent and expect 
to be invoked multiple
      *     times. It receives the current state as input and returns the 
desired change.  It may
      *     throw an exception in the event of a failure.
   ```
   
   I assume the semantics here are as follows:
   * If the updater function returns a `TaskStoreResult`, this new state should 
be persisted to the task store
   * If it returns an empty optional, then the task should be deleted from the 
task store
   * If it throws, then an error occurred and an updated state should be 
recomputed
   
   Is this correct?  Those semantics should probably be described in the 
Javadoc.



##########
tasks/spi/src/main/java/org/apache/polaris/tasks/spi/TaskIdGenerator.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.tasks.spi;
+
+import com.google.common.hash.PrimitiveSink;
+import java.util.function.Consumer;
+import org.apache.polaris.tasks.api.TaskId;
+import org.apache.polaris.tasks.api.TaskParameter;
+import org.apache.polaris.tasks.api.TaskResult;
+import org.apache.polaris.tasks.api.Tasks;
+
+public interface TaskIdGenerator {
+
+  /** Used to generate a non-deterministic and unique task ID. */
+  TaskId generateNonDeterministicUniqueId(String realmId);
+
+  /**
+   * Used to generate a deterministic task ID, from the contents of a {@link 
TaskParameter}.
+   *
+   * <p>Deterministic task IDs are useful to converge requests for the same 
task to a single task
+   * execution. {@link Tasks} implementations ensure that task submissions 
against the same task ID
+   * are executed only once and all submissions share the same {@linkplain 
TaskResult task result}.
+   *
+   * <p>Implementations must include the given {@code realmId} in the task ID 
generation
+   * respectively the calculation.
+   */
+  // TODO this makes Guava a part of this API - acceptable in this case?
+  @SuppressWarnings("UnstableApiUsage")
+  TaskId generateDeterministicId(String realmId, Consumer<PrimitiveSink> 
funnel);

Review Comment:
   That part confuses me the most.  The Javadoc says that this method consumes 
the content of a `TaskParameter` but the method itself does not accept that 
kind of object.  Shouldn't it accept zero or more `TaskParameter` instead of a 
consumer of a guava class that itself consumes primitive types?
   
   



##########
tasks/store/src/main/java/org/apache/polaris/tasks/store/TaskState.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.tasks.store;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import java.time.Instant;
+import java.util.Optional;
+import org.apache.polaris.immutables.PolarisImmutable;
+import org.apache.polaris.tasks.api.TaskBehaviorId;
+import org.apache.polaris.tasks.api.TaskId;
+import org.apache.polaris.tasks.api.TaskParameter;
+import org.apache.polaris.tasks.api.TaskResult;
+import org.apache.polaris.tasks.spi.TaskBehavior;
+import org.apache.polaris.tasks.spi.TaskExecutionError;
+import org.immutables.value.Value;
+
+/** Represents the state of a particular task execution. */
+@PolarisImmutable
+@JsonSerialize(as = ImmutableTaskState.class)
+@JsonDeserialize(as = ImmutableTaskState.class)
+public interface TaskState {
+  TaskId taskId();
+
+  /**
+   * ID of the {@linkplain TaskBehavior behavior}.
+   *
+   * <p>Users of a {@link TaskState} object must be aware that a {@linkplain 
TaskBehavior behavior}
+   * may not be available. Unknown behaviors must not be prematurely deleted, 
because another
+   * instance might be able to handle those behaviors. Rolling upgrades are 
legit use cases for the
+   * case hitting an unknown behavior.
+   */
+  TaskBehaviorId behaviorId();
+
+  /** The current task status. */
+  TaskStatus status();
+
+  /** Represents an error message, intended for humans, not machines. */
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  Optional<TaskExecutionError> error();
+
+  /** Represents the earliest timestamp when a task can be run (again). */
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  Optional<Instant> scheduleNotBefore();
+
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  Optional<Instant> executedAt();
+
+  /**
+   * Represents the earliest timestamp when a task service can assume that a 
{@link
+   * TaskStatus#RUNNING RUNNING} task is lost and should be re-started. Only 
valid for {@link
+   * TaskStatus#RUNNING RUNNING}.
+   */
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  Optional<Instant> lostNotBefore();
+
+  TaskParameter taskParam();
+
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  Optional<TaskResult> taskResult();
+
+  static ImmutableTaskState.Builder builder() {
+    return ImmutableTaskState.builder();
+  }
+
+  @SuppressWarnings("UnnecessaryDefault")
+  @JsonIgnore
+  default boolean isScheduled(Instant now) {

Review Comment:
   It seems to me that this method returns `true` if the current task is 
scheduled **in the future**.  However it it is `SCHEDULED` but the current date 
is after the scheduled date, then this method returns `false`.  Is this 
intended?  Maybe add a Javadoc to explain how this method is expected to be 
used?



##########
tasks/spi/src/main/java/org/apache/polaris/tasks/spi/TaskBehavior.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.tasks.spi;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import java.time.Instant;
+import java.util.Optional;
+import org.apache.polaris.tasks.api.TaskBehaviorId;
+import org.apache.polaris.tasks.api.TaskId;
+import org.apache.polaris.tasks.api.TaskParameter;
+import org.apache.polaris.tasks.api.TaskResult;
+import org.apache.polaris.tasks.api.Tasks;
+
+/**
+ * Task behaviors provide/define how tasks behave and how tasks are handled.
+ *
+ * <p>Implementations are provided as {@link ApplicationScoped 
@ApplicationScoped} beans.
+ *
+ * <p>Each behavior must define its {@linkplain TaskParameter input parameter} 
and {@linkplain
+ * TaskResult result} types.
+ */
+public interface TaskBehavior<PARAM extends TaskParameter, RESULT extends 
TaskResult> {
+  /** Human-readable name. */
+  String name();
+
+  /** Globally unique ID of the task behavior. */
+  TaskBehaviorId id();
+
+  Class<PARAM> paramType();
+
+  Class<RESULT> resultType();
+
+  /**
+   * Provide a task-runnable that can perform the task behavior's operation.

Review Comment:
   ```suggestion
      * Provides a task function that can perform the task behavior's operation.
   ```



##########
tasks/spi/src/main/java/org/apache/polaris/tasks/spi/TaskIdGenerator.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.tasks.spi;
+
+import com.google.common.hash.PrimitiveSink;
+import java.util.function.Consumer;
+import org.apache.polaris.tasks.api.TaskId;
+import org.apache.polaris.tasks.api.TaskParameter;
+import org.apache.polaris.tasks.api.TaskResult;
+import org.apache.polaris.tasks.api.Tasks;
+
+public interface TaskIdGenerator {
+
+  /** Used to generate a non-deterministic and unique task ID. */
+  TaskId generateNonDeterministicUniqueId(String realmId);
+
+  /**
+   * Used to generate a deterministic task ID, from the contents of a {@link 
TaskParameter}.
+   *
+   * <p>Deterministic task IDs are useful to converge requests for the same 
task to a single task
+   * execution. {@link Tasks} implementations ensure that task submissions 
against the same task ID
+   * are executed only once and all submissions share the same {@linkplain 
TaskResult task result}.
+   *
+   * <p>Implementations must include the given {@code realmId} in the task ID 
generation
+   * respectively the calculation.

Review Comment:
   The `respectively the calculation` part sounds odd, is this intended?



##########
tasks/store/src/main/java/org/apache/polaris/tasks/store/TaskHandle.java:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.tasks.store;
+
+/**
+ * Represents the store implementation-specific and opaque handle for one 
specific task execution.
+ *
+ * <p>Implementation note: must implement {@link Object#equals(Object)}!

Review Comment:
   ```suggestion
    * <p>Implementation note: implementations must override {@link 
Object#equals(Object)} and {@link
    * Object#hashCode()}.
   ```
   
   I assume we want to follow good Java practices here :-) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to