This is an automated email from the ASF dual-hosted git repository.

dimas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new 3dd46b979 NoSQL: Node IDs - API, SPI + general implementation (#2728)
3dd46b979 is described below

commit 3dd46b979e6d96774cf9c35dcf006759c43a2d5f
Author: Robert Stupp <[email protected]>
AuthorDate: Tue Oct 28 23:34:54 2025 +0100

    NoSQL: Node IDs - API, SPI + general implementation (#2728)
    
    * NoSQL: Node IDs - API, SPI + general implementation
    
    This PR provides a mechanism to assign a Polaris-cluster-wide unique 
node-ID to each Polaris instance, which is then used when generating 
Polaris-cluster-wide unique Snowflake-IDs.
    
    The change is fundamental for the NoSQL work, but also demanded for the 
existing relational JDBC persistence.
    
    Does not include any persistence specific implementation.
    
    * NoSQL: Fail node-management-impl init after timeout
    
    Also move the expensive part to a `@PostConstruct` to not block CDI 
entirely from initializing.
---
 bom/build.gradle.kts                               |   4 +
 gradle/projects.main.properties                    |   4 +
 persistence/nosql/nodes/README.md                  |  34 ++
 persistence/nosql/nodes/api/build.gradle.kts       |  44 ++
 .../java/org/apache/polaris/nodeids/api/Node.java  |  48 ++
 .../org/apache/polaris/nodeids/api/NodeLease.java  |  45 ++
 .../apache/polaris/nodeids/api/NodeManagement.java |  72 +++
 .../polaris/nodeids/api/NodeManagementConfig.java  |  87 +++
 persistence/nosql/nodes/impl/build.gradle.kts      |  64 +++
 .../polaris/nodeids/impl/NodeManagementImpl.java   | 608 +++++++++++++++++++++
 .../java/org/apache/polaris/nodeids/impl/Util.java |  38 ++
 .../apache/polaris/nodeids/impl/package-info.java  |  20 +
 .../impl/src/main/resources/META-INF/beans.xml     |  24 +
 .../polaris/nodeids/impl/TestNodeLeases.java       | 221 ++++++++
 .../nodeids/impl/TestNodeManagementImpl.java       | 244 +++++++++
 .../nodes/impl/src/test/resources/logback-test.xml |  32 ++
 .../apache/polaris/nodeids/impl/MockNodeStore.java |  74 +++
 .../polaris/nodeids/impl/MockNodeStoreFactory.java |  56 ++
 persistence/nosql/nodes/spi/build.gradle.kts       |  41 ++
 .../polaris/nodeids/spi/NodeManagementState.java   |  30 +
 .../org/apache/polaris/nodeids/spi/NodeState.java  |  30 +
 .../org/apache/polaris/nodeids/spi/NodeStore.java  |  34 ++
 .../polaris/nodeids/spi/NodeStoreFactory.java      |  32 ++
 23 files changed, 1886 insertions(+)

diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts
index 90281d65f..c6f0713eb 100644
--- a/bom/build.gradle.kts
+++ b/bom/build.gradle.kts
@@ -44,6 +44,10 @@ dependencies {
     api(project(":polaris-idgen-impl"))
     api(project(":polaris-idgen-spi"))
 
+    api(project(":polaris-nodes-api"))
+    api(project(":polaris-nodes-impl"))
+    api(project(":polaris-nodes-spi"))
+
     api(project(":polaris-config-docs-annotations"))
     api(project(":polaris-config-docs-generator"))
 
diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties
index f6c46f8be..c77a06fd7 100644
--- a/gradle/projects.main.properties
+++ b/gradle/projects.main.properties
@@ -57,5 +57,9 @@ polaris-idgen-api=persistence/nosql/idgen/api
 polaris-idgen-impl=persistence/nosql/idgen/impl
 polaris-idgen-mocks=persistence/nosql/idgen/mocks
 polaris-idgen-spi=persistence/nosql/idgen/spi
+# nodes
+polaris-nodes-api=persistence/nosql/nodes/api
+polaris-nodes-impl=persistence/nosql/nodes/impl
+polaris-nodes-spi=persistence/nosql/nodes/spi
 # persistence / database agnostic
 polaris-persistence-nosql-varint=persistence/nosql/persistence/varint
diff --git a/persistence/nosql/nodes/README.md 
b/persistence/nosql/nodes/README.md
new file mode 100644
index 000000000..19370b82f
--- /dev/null
+++ b/persistence/nosql/nodes/README.md
@@ -0,0 +1,34 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+ 
+   http://www.apache.org/licenses/LICENSE-2.0
+ 
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# Uniquely identify running Polaris nodes
+
+Some ID generation mechanisms,
+like 
[Snowflake-IDs](https://medium.com/@jitenderkmr/demystifying-snowflake-ids-a-unique-identifier-in-distributed-computing-72796a827c9d),
+require unique integer IDs for each running node. This framework provides a 
mechanism to assign each running node a
+unique integer ID.
+
+## Code structure
+
+The code is structured into multiple modules. Consuming code should almost 
always pull in only the API module.
+
+* `polaris-nodes-api` provides the necessary Java interfaces and immutable 
types.
+* `polaris-nodes-impl` provides the storage agnostic implementation.
+* `polaris-nodes-spi` provides the necessary interfaces to provide a storage 
specific implementation.
+* `polaris-nodes-store-nosql` provides the storage implementation based on 
`polaris-persistence-nosql-api`.
diff --git a/persistence/nosql/nodes/api/build.gradle.kts 
b/persistence/nosql/nodes/api/build.gradle.kts
new file mode 100644
index 000000000..3374ed48f
--- /dev/null
+++ b/persistence/nosql/nodes/api/build.gradle.kts
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+plugins {
+  id("org.kordamp.gradle.jandex")
+  id("polaris-server")
+}
+
+description = "Polaris nodes API, no concrete implementations"
+
+dependencies {
+  implementation(project(":polaris-idgen-api"))
+
+  compileOnly(project(":polaris-immutables"))
+  annotationProcessor(project(":polaris-immutables", configuration = 
"processor"))
+
+  implementation(platform(libs.jackson.bom))
+  implementation("com.fasterxml.jackson.core:jackson-databind")
+
+  compileOnly(libs.jakarta.annotation.api)
+  compileOnly(libs.jakarta.validation.api)
+  compileOnly(libs.jakarta.inject.api)
+  compileOnly(libs.jakarta.enterprise.cdi.api)
+
+  compileOnly(libs.smallrye.config.core)
+  compileOnly(platform(libs.quarkus.bom))
+  compileOnly("io.quarkus:quarkus-core")
+}
diff --git 
a/persistence/nosql/nodes/api/src/main/java/org/apache/polaris/nodeids/api/Node.java
 
b/persistence/nosql/nodes/api/src/main/java/org/apache/polaris/nodeids/api/Node.java
new file mode 100644
index 000000000..4fb7e82bb
--- /dev/null
+++ 
b/persistence/nosql/nodes/api/src/main/java/org/apache/polaris/nodeids/api/Node.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.nodeids.api;
+
+import jakarta.annotation.Nullable;
+import java.time.Instant;
+import org.apache.polaris.immutables.PolarisImmutable;
+
+/** Represents the local node's ID and informative, mutable state information. 
*/
+@PolarisImmutable
+public interface Node {
+  /**
+   * Returns the ID of this node.
+   *
+   * @return ID of this node
+   * @throws IllegalStateException if the lease is no longer valid, for 
example, expired before it
+   *     being renewed
+   */
+  int id();
+
+  default boolean valid(long nowInMillis) {
+    return nowInMillis < expirationTimestamp().toEpochMilli();
+  }
+
+  Instant leaseTimestamp();
+
+  @Nullable
+  Instant renewLeaseTimestamp();
+
+  /** Timestamp since which this node's lease is no longer valid. */
+  Instant expirationTimestamp();
+}
diff --git 
a/persistence/nosql/nodes/api/src/main/java/org/apache/polaris/nodeids/api/NodeLease.java
 
b/persistence/nosql/nodes/api/src/main/java/org/apache/polaris/nodeids/api/NodeLease.java
new file mode 100644
index 000000000..ee88a389f
--- /dev/null
+++ 
b/persistence/nosql/nodes/api/src/main/java/org/apache/polaris/nodeids/api/NodeLease.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.nodeids.api;
+
+import jakarta.annotation.Nullable;
+
+public interface NodeLease {
+  /**
+   * Returns the {@link Node} representation for this lease if the lease has 
not been released or
+   * {@code null}.
+   */
+  @Nullable
+  Node node();
+
+  /**
+   * Permanently release the lease. Does nothing, if already released. Throws 
if persisting the
+   * released state fails.
+   */
+  void release();
+
+  /**
+   * Force a lease renewal, generally not recommended nor necessary. Throws, 
if the lease is already
+   * released.
+   */
+  void renew();
+
+  /** Returns the node ID if the lease is active/valid or {@code -1}. */
+  int nodeIdIfValid();
+}
diff --git 
a/persistence/nosql/nodes/api/src/main/java/org/apache/polaris/nodeids/api/NodeManagement.java
 
b/persistence/nosql/nodes/api/src/main/java/org/apache/polaris/nodeids/api/NodeManagement.java
new file mode 100644
index 000000000..dd3cf72b6
--- /dev/null
+++ 
b/persistence/nosql/nodes/api/src/main/java/org/apache/polaris/nodeids/api/NodeManagement.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.nodeids.api;
+
+import jakarta.annotation.Nonnull;
+import jakarta.enterprise.context.ApplicationScoped;
+import java.util.Optional;
+import org.apache.polaris.ids.api.IdGenerator;
+import org.apache.polaris.ids.api.SnowflakeIdGenerator;
+
+/**
+ * API to lease node IDs, primarily to generate {@linkplain 
SnowflakeIdGenerator snowflake IDs}.
+ *
+ * <p>The default configuration for the snowflake IDs allows generation of 
4096 IDs per millisecond
+ * (12 sequence bits), which should be more than enough. As a consequence, it 
is very likely enough
+ * to have only one ID generator per JVM, across all realms and catalogs.
+ *
+ * <p>Implementation is provided as an {@link ApplicationScoped 
@ApplicationScoped} bean
+ */
+public interface NodeManagement extends AutoCloseable {
+  /**
+   * Build a <em>new</em> and <em>independent</em> ID generator instance of a 
{@linkplain #lease()
+   * leased node} using the given clock.
+   *
+   * <p>This function must only be called from {@link ApplicationScoped 
@ApplicationScoped} CDI
+   * producers providing the same {@link IdGenerator} for the lifetime of the 
given {@link Node},
+   * aka at most once for a {@link Node} instance.
+   */
+  IdGenerator buildIdGenerator(@Nonnull NodeLease leasedNode);
+
+  /** The maximum number of concurrently leased nodes that are supported. */
+  int maxNumberOfNodes();
+
+  /** Retrieve information about a specific node. */
+  Optional<Node> getNodeInfo(int nodeId);
+
+  /** Get the persistence ID for a node by its ID. */
+  long systemIdForNode(int nodeId);
+
+  /**
+   * Lease a node.
+   *
+   * <p>The implementation takes care of periodically renewing the lease. It 
is not necessary to
+   * explicitly call {@link NodeLease#renew()}.
+   *
+   * <p>It is possible that the {@linkplain NodeLease#nodeIdIfValid() ID of 
the leased node} changes
+   * over time.
+   *
+   * <p>Each invocation returns a new, independent lease.
+   *
+   * @return the leased node
+   * @throws IllegalStateException if no node ID could be leased
+   */
+  @Nonnull
+  NodeLease lease();
+}
diff --git 
a/persistence/nosql/nodes/api/src/main/java/org/apache/polaris/nodeids/api/NodeManagementConfig.java
 
b/persistence/nosql/nodes/api/src/main/java/org/apache/polaris/nodeids/api/NodeManagementConfig.java
new file mode 100644
index 000000000..c50aa951b
--- /dev/null
+++ 
b/persistence/nosql/nodes/api/src/main/java/org/apache/polaris/nodeids/api/NodeManagementConfig.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.nodeids.api;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import io.smallrye.config.ConfigMapping;
+import io.smallrye.config.WithDefault;
+import java.time.Duration;
+import org.apache.polaris.ids.api.IdGenerator;
+import org.apache.polaris.ids.api.IdGeneratorSpec;
+import org.apache.polaris.ids.api.SnowflakeIdGenerator;
+import org.apache.polaris.immutables.PolarisImmutable;
+import org.immutables.value.Value;
+
+/** Node management configuration. */
+@ConfigMapping(prefix = "polaris.node")
+@JsonSerialize(as = ImmutableBuildableNodeManagementConfig.class)
+@JsonDeserialize(as = ImmutableBuildableNodeManagementConfig.class)
+public interface NodeManagementConfig {
+  /** Duration of a node-lease. */
+  @WithDefault(DEFAULT_LEASE_DURATION)
+  Duration leaseDuration();
+
+  /** Time window before the end of a node lease when the lease will be 
renewed. */
+  @WithDefault(DEFAULT_RENEWAL_PERIOD)
+  Duration renewalPeriod();
+
+  /**
+   * Maximum number of concurrently active Polaris nodes. Do not change this 
value or the ID
+   * generator spec, it is a rather internal property. See ID generator spec 
below.
+   */
+  @WithDefault(DEFAULT_NUM_NODES)
+  int numNodes();
+
+  /**
+   * Configuration needed to build an {@linkplain IdGenerator ID generator}. 
This configuration
+   * cannot be changed after one Polaris node has been successfully started. 
This specification will
+   * be ignored if a persisted one exists, but a warning shall be logged if 
both are different.
+   */
+  IdGeneratorSpec idGeneratorSpec();
+
+  String DEFAULT_LEASE_DURATION = "PT1H";
+  String DEFAULT_RENEWAL_PERIOD = "PT15M";
+  String DEFAULT_NUM_NODES = "" + (1 << 
SnowflakeIdGenerator.DEFAULT_NODE_ID_BITS);
+
+  @PolarisImmutable
+  interface BuildableNodeManagementConfig extends NodeManagementConfig {
+
+    static ImmutableBuildableNodeManagementConfig.Builder builder() {
+      return ImmutableBuildableNodeManagementConfig.builder();
+    }
+
+    @Override
+    @Value.Default
+    default Duration leaseDuration() {
+      return Duration.parse(DEFAULT_LEASE_DURATION);
+    }
+
+    @Override
+    @Value.Default
+    default Duration renewalPeriod() {
+      return Duration.parse(DEFAULT_RENEWAL_PERIOD);
+    }
+
+    @Override
+    default int numNodes() {
+      return 1 << SnowflakeIdGenerator.DEFAULT_NODE_ID_BITS;
+    }
+  }
+}
diff --git a/persistence/nosql/nodes/impl/build.gradle.kts 
b/persistence/nosql/nodes/impl/build.gradle.kts
new file mode 100644
index 000000000..a76544ffb
--- /dev/null
+++ b/persistence/nosql/nodes/impl/build.gradle.kts
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+plugins {
+  id("org.kordamp.gradle.jandex")
+  id("polaris-server")
+}
+
+description = "Polaris nodes management implementation"
+
+dependencies {
+  implementation(project(":polaris-nodes-api"))
+  implementation(project(":polaris-nodes-spi"))
+  implementation(project(":polaris-idgen-api"))
+  implementation(project(":polaris-idgen-spi"))
+  implementation(project(":polaris-async-api"))
+
+  implementation(libs.guava)
+  implementation(libs.slf4j.api)
+
+  compileOnly(platform(libs.jackson.bom))
+  compileOnly("com.fasterxml.jackson.core:jackson-annotations")
+
+  compileOnly(project(":polaris-immutables"))
+  annotationProcessor(project(":polaris-immutables", configuration = 
"processor"))
+
+  compileOnly(libs.jakarta.annotation.api)
+  compileOnly(libs.jakarta.validation.api)
+  compileOnly(libs.jakarta.inject.api)
+  compileOnly(libs.jakarta.enterprise.cdi.api)
+
+  testFixturesApi(project(":polaris-idgen-api"))
+  testFixturesApi(project(":polaris-nodes-api"))
+  testFixturesApi(project(":polaris-nodes-spi"))
+
+  testFixturesRuntimeOnly(libs.smallrye.jandex)
+
+  testFixturesApi(platform(libs.jackson.bom))
+  testFixturesApi("com.fasterxml.jackson.core:jackson-annotations")
+
+  testImplementation(project(":polaris-idgen-mocks"))
+  testImplementation(project(":polaris-async-java"))
+  testImplementation(testFixtures(project(":polaris-async-api")))
+
+  testFixturesApi(libs.jakarta.annotation.api)
+}
+
+tasks.withType<Javadoc> { isFailOnError = false }
diff --git 
a/persistence/nosql/nodes/impl/src/main/java/org/apache/polaris/nodeids/impl/NodeManagementImpl.java
 
b/persistence/nosql/nodes/impl/src/main/java/org/apache/polaris/nodeids/impl/NodeManagementImpl.java
new file mode 100644
index 000000000..50ff3653d
--- /dev/null
+++ 
b/persistence/nosql/nodes/impl/src/main/java/org/apache/polaris/nodeids/impl/NodeManagementImpl.java
@@ -0,0 +1,608 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.nodeids.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.lang.Integer.bitCount;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
+import jakarta.annotation.Nonnull;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.net.InterfaceAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.IntStream;
+import org.apache.polaris.ids.api.IdGenerator;
+import org.apache.polaris.ids.api.IdGeneratorSpec;
+import org.apache.polaris.ids.api.ImmutableIdGeneratorSpec;
+import org.apache.polaris.ids.api.MonotonicClock;
+import org.apache.polaris.ids.spi.IdGeneratorFactory;
+import org.apache.polaris.ids.spi.IdGeneratorSource;
+import org.apache.polaris.nodeids.api.ImmutableNode;
+import org.apache.polaris.nodeids.api.Node;
+import org.apache.polaris.nodeids.api.NodeLease;
+import org.apache.polaris.nodeids.api.NodeManagement;
+import org.apache.polaris.nodeids.api.NodeManagementConfig;
+import org.apache.polaris.nodeids.spi.ImmutableBuildableNodeManagementState;
+import org.apache.polaris.nodeids.spi.ImmutableNodeState;
+import org.apache.polaris.nodeids.spi.NodeState;
+import org.apache.polaris.nodeids.spi.NodeStore;
+import org.apache.polaris.nodeids.spi.NodeStoreFactory;
+import org.apache.polaris.nosql.async.AsyncExec;
+import org.apache.polaris.nosql.async.Cancelable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@ApplicationScoped
+class NodeManagementImpl implements NodeManagement {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(NodeManagementImpl.class);
+
+  // should be a power of 2
+  private static final int CHECK_BATCH_SIZE = 16;
+  static final Duration RESCHEDULE_AFTER_FAILURE = Duration.ofSeconds(10);
+  static final Duration RESCHEDULE_UNTIL_EXPIRATION = Duration.ofMinutes(1);
+  static final Duration RENEWAL_MIN_LEFT_FOR_RENEWAL = Duration.ofSeconds(30);
+  private final NodeManagementConfig config;
+  private final MonotonicClock clock;
+  private final int numNodeIds;
+  private final Set<NodeLeaseImpl> registeredLeases = 
ConcurrentHashMap.newKeySet();
+  private final AsyncExec scheduler;
+  private final NodeStoreFactory nodeStoreFactory;
+
+  private NodeStore nodeStore;
+  private IdGeneratorFactory<?> idGenFactory;
+  private IdGeneratorSpec idGenSpec;
+  private IdGenerator systemIdGen;
+
+  private volatile boolean closed;
+
+  @SuppressWarnings("CdiInjectionPointsInspection")
+  @Inject
+  NodeManagementImpl(
+      NodeManagementConfig config,
+      MonotonicClock clock,
+      NodeStoreFactory nodeStoreFactory,
+      AsyncExec scheduler) {
+    var activePeriod = config.leaseDuration().minus(config.renewalPeriod());
+    this.nodeStoreFactory = nodeStoreFactory;
+    this.numNodeIds = config.numNodes();
+    checkArgs(
+        () ->
+            checkArgument(
+                config.leaseDuration().compareTo(Duration.ofMinutes(15)) > 0,
+                "leaseDuration must be at least 15 minutes"),
+        () ->
+            checkArgument(
+                activePeriod.isPositive(), "leaseDuration must be greater than 
renewalPeriod"),
+        () ->
+            checkArgument(
+                activePeriod.compareTo(Duration.ofMinutes(15)) > 0,
+                "active period (leaseDuration - renewalPeriod) must be at 
least 15 minutes"),
+        () ->
+            checkArgument(
+                numNodeIds >= CHECK_BATCH_SIZE && bitCount(numNodeIds) == 1,
+                "numNodeIds %s must not be smaller than %s and a power of 2",
+                numNodeIds,
+                CHECK_BATCH_SIZE));
+    this.config = config;
+    this.clock = clock;
+    this.scheduler = scheduler;
+  }
+
+  @SuppressWarnings("BusyWait")
+  @PostConstruct
+  void init() {
+    var idGenSpec =
+        (IdGeneratorSpec) 
ImmutableIdGeneratorSpec.builder().from(config.idGeneratorSpec()).build();
+    var validationIdGeneratorSource =
+        new IdGeneratorSource() {
+          @Override
+          public long currentTimeMillis() {
+            return clock.currentTimeMillis();
+          }
+
+          @Override
+          public int nodeId() {
+            return 0;
+          }
+        };
+
+    // If this loop doesn't complete within 10 minutes, we can only give up.
+    var timeout = clock.currentInstant().plus(Duration.ofMinutes(10));
+    while (true) {
+      var existingNodeManagementState = 
nodeStoreFactory.fetchManagementState();
+      if (existingNodeManagementState.isPresent()) {
+        var spec = 
Util.idgenSpecFromManagementState(existingNodeManagementState);
+        if (!idGenSpec.equals(spec)) {
+          warnOnIncompatibleIdGeneratorSpec(spec);
+          idGenSpec = spec;
+        }
+        var factory = IdGeneratorFactory.lookupFactory(idGenSpec.type());
+        // try to build an ID generator instance to validate the spec
+        factory.validateParameters(idGenSpec.params(), 
validationIdGeneratorSource);
+        idGenFactory = factory;
+        break;
+      } else {
+        var factory = IdGeneratorFactory.lookupFactory(idGenSpec.type());
+        // try to build an ID generator instance to validate the spec
+        factory.validateParameters(idGenSpec.params(), 
validationIdGeneratorSource);
+
+        if (nodeStoreFactory.storeManagementState(
+            
ImmutableBuildableNodeManagementState.builder().idGeneratorSpec(idGenSpec).build()))
 {
+          LOGGER.info("Persisted node management configuration.");
+          idGenFactory = factory;
+          break;
+        }
+      }
+      if (timeout.isBefore(clock.currentInstant())) {
+        throw new IllegalStateException(
+            "Timed out to fetch and/or persist node management configuration. 
This is likely due to an overloaded backend database.");
+      }
+      try {
+        // random sleep
+        Thread.sleep(ThreadLocalRandom.current().nextInt(10, 500));
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IllegalStateException(
+            "Interrupted while waiting for node management configuration to be 
fetched/persisted",
+            e);
+      }
+    }
+
+    this.idGenSpec = idGenSpec;
+
+    var nodeIdGen = idGenFactory.buildSystemIdGenerator(idGenSpec.params());
+    this.systemIdGen = nodeIdGen;
+    this.nodeStore = nodeStoreFactory.createNodeStore(nodeIdGen);
+  }
+
+  static void checkArgs(Runnable... checks) {
+    var violations = new ArrayList<String>();
+    for (Runnable check : checks) {
+      try {
+        check.run();
+      } catch (IllegalArgumentException iae) {
+        violations.add(iae.getMessage());
+      }
+    }
+    if (!violations.isEmpty()) {
+      throw new IllegalArgumentException(String.join(", ", violations));
+    }
+  }
+
+  @Override
+  public long systemIdForNode(int nodeId) {
+    return systemIdGen.systemIdForNode(nodeId);
+  }
+
+  @Override
+  public IdGenerator buildIdGenerator(@Nonnull NodeLease leasedNode) {
+    var idGenSource =
+        new IdGeneratorSource() {
+          @Override
+          public long currentTimeMillis() {
+            return clock.currentTimeMillis();
+          }
+
+          @Override
+          public int nodeId() {
+            return leasedNode.nodeIdIfValid();
+          }
+        };
+    return idGenFactory.buildIdGenerator(idGenSpec.params(), idGenSource);
+  }
+
+  @VisibleForTesting
+  void warnOnIncompatibleIdGeneratorSpec(IdGeneratorSpec spec) {
+    LOGGER.warn(
+        "Provided ID generator specification does not match the persisted, 
unmodifiable one: provided: {} - persisted: {}",
+        idGenSpec,
+        spec);
+  }
+
+  @Override
+  public int maxNumberOfNodes() {
+    return numNodeIds;
+  }
+
+  @Override
+  public Optional<Node> getNodeInfo(int nodeId) {
+    checkArgument(nodeId >= 0 && nodeId < numNodeIds, "Illegal node ID " + 
nodeId);
+    var leased =
+        registeredLeases.stream()
+            .map(NodeLeaseImpl::node)
+            .filter(Objects::nonNull)
+            .filter(n -> n.id() == nodeId)
+            .findFirst();
+    if (leased.isPresent()) {
+      return leased;
+    }
+
+    return nodeStore
+        .fetch(nodeId)
+        .map(
+            nodeObj ->
+                ImmutableNode.builder()
+                    .id(nodeId)
+                    .leaseTimestamp(nodeObj.leaseTimestamp())
+                    .expirationTimestamp(nodeObj.expirationTimestamp())
+                    .build());
+  }
+
+  @Override
+  @Nonnull
+  public NodeLease lease() {
+    var leaseParams = leaseInternal();
+    var lease = new NodeLeaseImpl(leaseParams);
+    registeredLeases.add(lease);
+    return lease;
+  }
+
+  private LeaseParams leaseInternal() {
+    LOGGER.debug("Leasing a node ID...");
+
+    checkState(!closed, "NodeManagement instance has been closed");
+
+    var now = clock.currentInstant();
+
+    // First, try with a hash of the local network address.
+    // The node ID in this attempt is somewhat deterministic for the local 
machine.
+    // This is not really necessary, but it can reduce the overall number of 
ever-allocated node
+    // IDs.
+    try {
+      var hashOverNetworkInterfaces =
+          NetworkInterface.networkInterfaces()
+              .mapToInt(
+                  ni -> {
+                    try {
+                      if (ni.isUp()
+                          && !ni.isLoopback()
+                          && !ni.isVirtual()
+                          && !ni.isPointToPoint()) {
+                        return ni.getInterfaceAddresses().stream()
+                            .mapToInt(InterfaceAddress::hashCode)
+                            .reduce((a, b) -> 31 * a + b)
+                            .orElse(0);
+                      }
+                    } catch (SocketException e) {
+                      // ignore
+                    }
+                    return 0;
+                  })
+              .reduce((a, b) -> 31 * a + b)
+              .orElse(0);
+
+      var nodeId = hashOverNetworkInterfaces & (numNodeIds - 1);
+
+      var leased = tryLeaseFromCandidates(new int[] {nodeId}, now);
+      if (leased != null) {
+        return leased;
+      }
+    } catch (SocketException e) {
+      // ignore
+    }
+
+    // If the lease-attempt using the hash over network-interfaced did not 
succeed, try with
+    // randomly picked node IDs.
+
+    var nodeIdsToCheck = IntStream.range(0, numNodeIds).toArray();
+    Ints.rotate(nodeIdsToCheck, 
ThreadLocalRandom.current().nextInt(numNodeIds));
+
+    for (int i = 0; i < numNodeIds; i += CHECK_BATCH_SIZE) {
+      var ids = Arrays.copyOfRange(nodeIdsToCheck, i, i + CHECK_BATCH_SIZE);
+      var leased = tryLeaseFromCandidates(ids, now);
+      if (leased != null) {
+        return leased;
+      }
+    }
+
+    // No approach worked, give up - boom!
+    throw new IllegalStateException("Could not lease any node ID");
+  }
+
+  @Override
+  @PreDestroy
+  public void close() {
+    LOGGER.debug("Closing NodeManagement");
+    closed = true;
+    RuntimeException ex = null;
+    // Iterating over a snapshot of registeredLeases, as 
NodeLeaseImpl.release() modifies it.
+    // Note: NodeLeaseImpl.release() checks against 'registeredLeases'.
+    for (var leased : new ArrayList<>(registeredLeases)) {
+      try {
+        leased.release();
+      } catch (RuntimeException e) {
+        if (ex == null) {
+          ex = e;
+        } else {
+          ex.addSuppressed(e);
+        }
+      }
+    }
+    if (ex != null) {
+      throw ex;
+    }
+  }
+
+  private LeaseParams tryLeaseFromCandidates(int[] nodeIds, Instant now) {
+    var nodesFetched = nodeStore.fetchMany(nodeIds);
+    for (int i = 0; i < nodeIds.length; i++) {
+      // NodeStore.fetchMany MUST return as many elements as the input array.
+      var nodeId = nodeIds[i];
+      var node = nodesFetched[i];
+      if (canLease(node, now)) {
+        var leased = tryLease(nodeId, node);
+        if (leased != null) {
+          return leased;
+        }
+      }
+    }
+    return null;
+  }
+
+  private boolean canLease(NodeState node, Instant now) {
+    if (node == null) {
+      return true;
+    }
+
+    return node.expirationTimestamp().compareTo(now) < 0;
+  }
+
+  private LeaseParams tryLease(int nodeId, NodeState node) {
+    var now = clock.currentInstant();
+    var expire = now.plus(config.leaseDuration());
+
+    LOGGER.debug("Try lease node ID {} ...", nodeId);
+
+    var newNode =
+        
ImmutableNodeState.builder().leaseTimestamp(now).expirationTimestamp(expire).build();
+    var persisted = nodeStore.persist(nodeId, Optional.ofNullable(node), 
newNode);
+    if (persisted == null) {
+      return null;
+    }
+    checkState(
+        persisted.equals(newNode),
+        "Internal error: value of persisted %s != to-persist %s",
+        persisted,
+        newNode);
+
+    // conditional insert/update succeeded - got the lease!
+
+    var renewal = 
persisted.expirationTimestamp().minus(config.renewalPeriod());
+    return new LeaseParams(nodeId, persisted, renewal);
+  }
+
+  record LeaseParams(
+      int nodeId,
+      NodeState nodeState,
+      Instant renewLeaseTimestamp,
+      long expirationTimestampMillis) {
+    LeaseParams(int nodeId, NodeState nodeState, Instant renewLeaseTimestamp) {
+      this(nodeId, nodeState, renewLeaseTimestamp, 
nodeState.expirationTimestamp().toEpochMilli());
+    }
+  }
+
+  class NodeLeaseImpl implements NodeLease {
+    private volatile LeaseParams leaseParams;
+    private volatile Cancelable<Void> renewFuture;
+    private final Lock lock = new ReentrantLock();
+
+    NodeLeaseImpl(LeaseParams leaseParams) {
+      newLease(leaseParams);
+    }
+
+    private void newLease(LeaseParams leaseParams) {
+      this.leaseParams = leaseParams;
+      var renewalIn =
+          Duration.ofMillis(
+              leaseParams.renewLeaseTimestamp.toEpochMilli() - 
clock.currentTimeMillis());
+      reschedule(renewalIn);
+      LOGGER.info(
+          "New lease for node#{} acquired, valid until {} (renewal scheduled 
in {}).",
+          leaseParams.nodeId,
+          leaseParams.nodeState.expirationTimestamp(),
+          renewalIn.truncatedTo(ChronoUnit.SECONDS));
+    }
+
+    private void reschedule(Duration delay) {
+      // 'delay' can safely be negative, which is handled by the scheduler.
+      this.renewFuture = scheduler.schedule(this::renewAndReschedule, delay);
+    }
+
+    private void renewAndReschedule() {
+      if (closed) {
+        return;
+      }
+      lock.lock();
+      try {
+        var lp = leaseParams;
+        if (lp == null) {
+          return;
+        }
+        var id = lp.nodeId;
+        var validFor = Duration.ofMillis(lp.expirationTimestampMillis - 
clock.currentTimeMillis());
+        if (validFor.compareTo(RENEWAL_MIN_LEFT_FOR_RENEWAL) > 0) {
+          LOGGER.debug("Renewing lease for node#{}...", id);
+          try {
+            // The call to 'renewLease()` can throw an ISE if this call races 
w/
+            // NodeManagement.close().
+            // In that case, it's not just an annoyance, nothing more.
+            var newRenewalTimestamp = renewInternal(lp);
+            LOGGER.info("Lease for node#{} renewed until {}.", id, 
newRenewalTimestamp);
+            reschedule(
+                Duration.ofMillis(newRenewalTimestamp.toEpochMilli() - 
clock.currentTimeMillis()));
+            return;
+          } catch (Exception e) {
+            if (validFor.compareTo(RESCHEDULE_UNTIL_EXPIRATION) > 0) {
+              LOGGER.warn(
+                  "Could not renew lease for node#{} (lease still valid for 
{}), retrying in 10 seconds",
+                  id,
+                  validFor,
+                  e);
+              reschedule(RESCHEDULE_AFTER_FAILURE);
+              return;
+            } else {
+              LOGGER.warn(
+                  "Could not renew lease for node#{} (lease still valid for 
{}), attempting to get a new lease ...",
+                  id,
+                  validFor,
+                  e);
+            }
+          }
+        } else {
+          LOGGER.warn(
+              "Not renewing node lease for node#{}, because the lease is 
currently no longer valid. "
+                  + "The node lease should have been valid, but was not. This 
may happen if the node was massively overloaded.",
+              id);
+        }
+
+        try {
+          var newLease = leaseInternal();
+          newLease(newLease);
+        } catch (Exception e) {
+          LOGGER.warn("Could not get a new lease, reattempting in 10 seconds", 
e);
+          reschedule(RESCHEDULE_AFTER_FAILURE);
+        }
+
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    @Override
+    public int nodeIdIfValid() {
+      var lp = leaseParams;
+      if (lp == null || lp.expirationTimestampMillis <= 
clock.currentTimeMillis()) {
+        return -1;
+      }
+      return lp.nodeId;
+    }
+
+    @Override
+    public Node node() {
+      var lp = leaseParams;
+      return lp != null
+          ? ImmutableNode.builder()
+              .id(lp.nodeId)
+              .expirationTimestamp(lp.nodeState.expirationTimestamp())
+              .leaseTimestamp(lp.nodeState.leaseTimestamp())
+              .renewLeaseTimestamp(lp.renewLeaseTimestamp)
+              .build()
+          : null;
+    }
+
+    @Override
+    public void release() {
+      lock.lock();
+      try {
+        if (!registeredLeases.remove(this)) {
+          return;
+        }
+
+        // Shall never be null, but be safe here.
+        if (renewFuture != null) {
+          renewFuture.cancel();
+        }
+
+        var lp = leaseParams;
+        if (lp == null) {
+          return;
+        }
+        var id = lp.nodeId;
+        LOGGER.info("Releasing lease for node id {}", id);
+
+        var now = clock.currentInstant();
+        var activeState = lp.nodeState;
+        var nodeAsReleased =
+            
ImmutableNodeState.builder().from(activeState).expirationTimestamp(now).build();
+
+        leaseParams = null;
+
+        var updated = nodeStore.persist(id, Optional.of(activeState), 
nodeAsReleased);
+        checkState(
+            updated != null && updated.equals(nodeAsReleased),
+            "State of the node %s has been unexpectedly changed: value of 
persisted %s != to-persist %s",
+            id,
+            updated,
+            nodeAsReleased);
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    @Override
+    public void renew() {
+      lock.lock();
+      try {
+        var lp = leaseParams;
+        checkState(lp != null, "Lease has already been released");
+        renewInternal(lp);
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    private Instant renewInternal(LeaseParams lp) {
+      var id = lp.nodeId;
+      LOGGER.debug("Renewing lease for node id {}", id);
+
+      var now = clock.currentInstant();
+      var expire = now.plus(config.leaseDuration());
+      var renewal = expire.minus(config.renewalPeriod());
+
+      var newNode =
+          ImmutableNodeState.builder()
+              .leaseTimestamp(lp.nodeState.leaseTimestamp())
+              .expirationTimestamp(expire)
+              .build();
+
+      var persisted = nodeStore.persist(id, Optional.of(lp.nodeState), 
newNode);
+      checkState(
+          persisted != null,
+          "State of the NodeState for nodeId %s has been unexpectedly 
changed!",
+          id);
+      checkState(
+          persisted.equals(newNode),
+          "Internal error: value of persisted %s != to-persist %s",
+          persisted,
+          newNode);
+
+      leaseParams = new LeaseParams(id, persisted, renewal);
+
+      return renewal;
+    }
+  }
+}
diff --git 
a/persistence/nosql/nodes/impl/src/main/java/org/apache/polaris/nodeids/impl/Util.java
 
b/persistence/nosql/nodes/impl/src/main/java/org/apache/polaris/nodeids/impl/Util.java
new file mode 100644
index 000000000..eaf24887e
--- /dev/null
+++ 
b/persistence/nosql/nodes/impl/src/main/java/org/apache/polaris/nodeids/impl/Util.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.nodeids.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Optional;
+import org.apache.polaris.ids.api.IdGeneratorSpec;
+import org.apache.polaris.ids.api.ImmutableBuildableIdGeneratorSpec;
+import org.apache.polaris.nodeids.spi.NodeManagementState;
+
+public final class Util {
+  private Util() {}
+
+  @VisibleForTesting
+  public static IdGeneratorSpec idgenSpecFromManagementState(
+      Optional<NodeManagementState> existingNodeManagementState) {
+    return existingNodeManagementState
+        .orElseThrow()
+        .idGeneratorSpec()
+        .orElse(ImmutableBuildableIdGeneratorSpec.builder().build());
+  }
+}
diff --git 
a/persistence/nosql/nodes/impl/src/main/java/org/apache/polaris/nodeids/impl/package-info.java
 
b/persistence/nosql/nodes/impl/src/main/java/org/apache/polaris/nodeids/impl/package-info.java
new file mode 100644
index 000000000..549d09dc5
--- /dev/null
+++ 
b/persistence/nosql/nodes/impl/src/main/java/org/apache/polaris/nodeids/impl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/** Node management implementation, do not directly use the types in this 
package. */
+package org.apache.polaris.nodeids.impl;
diff --git a/persistence/nosql/nodes/impl/src/main/resources/META-INF/beans.xml 
b/persistence/nosql/nodes/impl/src/main/resources/META-INF/beans.xml
new file mode 100644
index 000000000..a297f1aa5
--- /dev/null
+++ b/persistence/nosql/nodes/impl/src/main/resources/META-INF/beans.xml
@@ -0,0 +1,24 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<beans xmlns="https://jakarta.ee/xml/ns/jakartaee";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="https://jakarta.ee/xml/ns/jakartaee 
https://jakarta.ee/xml/ns/jakartaee/beans_4_0.xsd";>
+    <!-- File required by Weld (used for testing), not by Quarkus -->
+</beans>
\ No newline at end of file
diff --git 
a/persistence/nosql/nodes/impl/src/test/java/org/apache/polaris/nodeids/impl/TestNodeLeases.java
 
b/persistence/nosql/nodes/impl/src/test/java/org/apache/polaris/nodeids/impl/TestNodeLeases.java
new file mode 100644
index 000000000..ae79f8914
--- /dev/null
+++ 
b/persistence/nosql/nodes/impl/src/test/java/org/apache/polaris/nodeids/impl/TestNodeLeases.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.nodeids.impl;
+
+import static java.time.temporal.ChronoUnit.MILLIS;
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.polaris.nodeids.impl.NodeManagementImpl.RESCHEDULE_AFTER_FAILURE;
+import static org.assertj.core.api.InstanceOfAssertFactories.BOOLEAN;
+
+import jakarta.annotation.Nonnull;
+import jakarta.annotation.Nullable;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.polaris.ids.api.IdGeneratorSpec;
+import org.apache.polaris.ids.mocks.MutableMonotonicClock;
+import org.apache.polaris.nodeids.api.NodeManagementConfig;
+import org.apache.polaris.nodeids.spi.NodeState;
+import org.apache.polaris.nosql.async.MockAsyncExec;
+import org.assertj.core.api.SoftAssertions;
+import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
+import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(SoftAssertionsExtension.class)
+public class TestNodeLeases {
+  @InjectSoftAssertions protected SoftAssertions soft;
+
+  MockAsyncExec scheduler;
+  MutableMonotonicClock clock;
+
+  @BeforeEach
+  void beforeEach() {
+    clock = new MutableMonotonicClock();
+    scheduler = new MockAsyncExec(clock);
+  }
+
+  @Test
+  public void leaseAgainAfterStalledExecutor() {
+    var config =
+        NodeManagementConfig.BuildableNodeManagementConfig.builder()
+            
.idGeneratorSpec(IdGeneratorSpec.BuildableIdGeneratorSpec.builder().build())
+            .build();
+    var renewInterval = config.leaseDuration().minus(config.renewalPeriod());
+    try (var mgmt = new NodeManagementImpl(config, clock, new 
MockNodeStoreFactory(), scheduler)) {
+      mgmt.init();
+
+      soft.assertThat(scheduler.tasks()).isEmpty();
+
+      var lease = mgmt.lease();
+      var node = requireNonNull(lease.node());
+      soft.assertThat(lease.nodeIdIfValid()).isNotEqualTo(-1);
+
+      soft.assertThat(scheduler.tasks())
+          .hasSize(1)
+          .first()
+          .extracting(MockAsyncExec.Task::nextExecution)
+          .isEqualTo(clock.currentInstant().plus(renewInterval));
+      soft.assertThat(lease.nodeIdIfValid()).isNotEqualTo(-1);
+
+      soft.assertThat(scheduler.nextReady()).isEmpty();
+
+      clock.advanceBoth(renewInterval);
+      soft.assertThat(scheduler.nextReady())
+          .map(MockAsyncExec.Task::call)
+          .get()
+          .extracting(MockAsyncExec.CallResult::called, BOOLEAN)
+          .isTrue();
+
+      soft.assertThat(
+              
scheduler.readyCount(clock.currentInstant().plus(renewInterval).minus(1, 
MILLIS)))
+          .isEqualTo(0L);
+      
soft.assertThat(scheduler.readyCount(clock.currentInstant().plus(renewInterval)))
+          .isEqualTo(1L);
+
+      soft.assertThat(requireNonNull(lease.node()).id()).isEqualTo(node.id());
+      soft.assertThat(requireNonNull(lease.node()).expirationTimestamp())
+          .isEqualTo(node.expirationTimestamp().plus(renewInterval));
+
+      // Advance the clock _after/at_ expiration so it the node-lease has to 
re-lease
+
+      clock.advanceBoth(config.leaseDuration());
+      soft.assertThat(lease.nodeIdIfValid()).isEqualTo(-1);
+
+      soft.assertThat(scheduler.nextReady())
+          .map(MockAsyncExec.Task::call)
+          .get()
+          .extracting(MockAsyncExec.CallResult::called, BOOLEAN)
+          .isTrue();
+
+      soft.assertThat(lease.nodeIdIfValid()).isNotEqualTo(-1);
+      
soft.assertThat(requireNonNull(lease.node()).id()).isNotEqualTo(node.id());
+      soft.assertThat(requireNonNull(lease.node()).expirationTimestamp())
+          .isEqualTo(clock.currentInstant().plus(config.leaseDuration()));
+    }
+    soft.assertThat(scheduler.tasks()).isEmpty();
+  }
+
+  @Test
+  public void rescheduleWorksAfterPersistFailures() {
+    var config =
+        NodeManagementConfig.BuildableNodeManagementConfig.builder()
+            
.idGeneratorSpec(IdGeneratorSpec.BuildableIdGeneratorSpec.builder().build())
+            .build();
+    var renewInterval = config.leaseDuration().minus(config.renewalPeriod());
+    var persistFailure = new AtomicBoolean(false);
+    var mockStore =
+        new MockNodeStore() {
+          @Nullable
+          @Override
+          public NodeState persist(
+              int nodeId, Optional<NodeState> expectedNodeState, @Nonnull 
NodeState newState) {
+            if (persistFailure.compareAndSet(true, false)) {
+              throw new RuntimeException("forced persist failure");
+            }
+            return super.persist(nodeId, expectedNodeState, newState);
+          }
+        };
+    try (var mgmt =
+        new NodeManagementImpl(config, clock, new 
MockNodeStoreFactory(mockStore), scheduler)) {
+      mgmt.init();
+
+      soft.assertThat(scheduler.tasks()).isEmpty();
+
+      var lease = mgmt.lease();
+      var node = requireNonNull(lease.node());
+
+      soft.assertThat(scheduler.tasks())
+          .hasSize(1)
+          .first()
+          .extracting(MockAsyncExec.Task::nextExecution)
+          .isEqualTo(clock.currentInstant().plus(renewInterval));
+
+      soft.assertThat(scheduler.nextReady()).isEmpty();
+
+      clock.advanceBoth(renewInterval);
+
+      // Force a failure during renewal
+      persistFailure.set(true);
+
+      soft.assertThat(scheduler.nextReady())
+          .map(MockAsyncExec.Task::call)
+          .get()
+          .extracting(MockAsyncExec.CallResult::called, 
MockAsyncExec.CallResult::failure)
+          .containsExactly(true, null);
+
+      soft.assertThat(lease.node()).isEqualTo(node);
+
+      soft.assertThat(
+              scheduler.readyCount(
+                  
clock.currentInstant().plus(RESCHEDULE_AFTER_FAILURE).minus(1, MILLIS)))
+          .isEqualTo(0L);
+      
soft.assertThat(scheduler.readyCount(clock.currentInstant().plus(RESCHEDULE_AFTER_FAILURE)))
+          .isEqualTo(1L);
+
+      clock.advanceBoth(RESCHEDULE_AFTER_FAILURE);
+
+      // renewal working
+
+      soft.assertThat(scheduler.nextReady())
+          .map(MockAsyncExec.Task::call)
+          .get()
+          .extracting(MockAsyncExec.CallResult::called, 
MockAsyncExec.CallResult::failure)
+          .containsExactly(true, null);
+
+      soft.assertThat(requireNonNull(lease.node()).id()).isEqualTo(node.id());
+      soft.assertThat(requireNonNull(lease.node()).expirationTimestamp())
+          
.isEqualTo(node.expirationTimestamp().plus(RESCHEDULE_AFTER_FAILURE).plus(renewInterval));
+
+      node = lease.node();
+
+      // simulate a persistence failure during re-lease operation
+
+      clock.advanceBoth(config.leaseDuration());
+      persistFailure.set(true);
+      soft.assertThat(scheduler.nextReady())
+          .map(MockAsyncExec.Task::call)
+          .get()
+          .extracting(MockAsyncExec.CallResult::called, 
MockAsyncExec.CallResult::failure)
+          .containsExactly(true, null);
+      soft.assertThat(lease.nodeIdIfValid()).isEqualTo(-1);
+      soft.assertThat(lease.node()).isEqualTo(node);
+
+      clock.advanceBoth(RESCHEDULE_AFTER_FAILURE);
+
+      soft.assertThat(scheduler.nextReady())
+          .map(MockAsyncExec.Task::call)
+          .get()
+          .extracting(MockAsyncExec.CallResult::called, 
MockAsyncExec.CallResult::failure)
+          .containsExactly(true, null);
+
+      soft.assertThat(lease.nodeIdIfValid()).isNotEqualTo(-1);
+      soft.assertThat(requireNonNull(lease.node()).expirationTimestamp())
+          .isEqualTo(clock.currentInstant().plus(config.leaseDuration()));
+
+      soft.assertThat(scheduler.nextReady()).isEmpty();
+
+      lease.release();
+
+      soft.assertThat(scheduler.tasks()).isEmpty();
+    }
+    soft.assertThat(scheduler.tasks()).isEmpty();
+  }
+}
diff --git 
a/persistence/nosql/nodes/impl/src/test/java/org/apache/polaris/nodeids/impl/TestNodeManagementImpl.java
 
b/persistence/nosql/nodes/impl/src/test/java/org/apache/polaris/nodeids/impl/TestNodeManagementImpl.java
new file mode 100644
index 000000000..2433d412d
--- /dev/null
+++ 
b/persistence/nosql/nodes/impl/src/test/java/org/apache/polaris/nodeids/impl/TestNodeManagementImpl.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.nodeids.impl;
+
+import static java.util.Objects.requireNonNull;
+import static org.assertj.core.api.InstanceOfAssertFactories.type;
+
+import jakarta.annotation.Nonnull;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.polaris.ids.api.IdGenerator;
+import org.apache.polaris.ids.api.IdGeneratorSpec;
+import org.apache.polaris.ids.api.ImmutableIdGeneratorSpec;
+import org.apache.polaris.ids.api.MonotonicClock;
+import org.apache.polaris.ids.api.SnowflakeIdGenerator;
+import org.apache.polaris.ids.impl.MonotonicClockImpl;
+import org.apache.polaris.ids.mocks.MutableMonotonicClock;
+import org.apache.polaris.nodeids.api.NodeLease;
+import org.apache.polaris.nodeids.api.NodeManagementConfig;
+import org.apache.polaris.nodeids.spi.ImmutableBuildableNodeManagementState;
+import org.apache.polaris.nodeids.spi.NodeManagementState;
+import org.apache.polaris.nodeids.spi.NodeStore;
+import org.apache.polaris.nodeids.spi.NodeStoreFactory;
+import org.apache.polaris.nosql.async.AsyncExec;
+import org.apache.polaris.nosql.async.java.JavaPoolAsyncExec;
+import org.assertj.core.api.SoftAssertions;
+import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
+import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(SoftAssertionsExtension.class)
+public class TestNodeManagementImpl {
+  @InjectSoftAssertions protected SoftAssertions soft;
+
+  AsyncExec scheduler;
+  MonotonicClock clock;
+
+  @BeforeEach
+  void beforeEach() {
+    clock = MonotonicClockImpl.newDefaultInstance();
+    scheduler = new JavaPoolAsyncExec();
+  }
+
+  @AfterEach
+  void afterEach() throws Exception {
+    ((AutoCloseable) scheduler).close();
+    clock.close();
+  }
+
+  @Test
+  public void incompatibleNodeManagementConfig() {
+    var incompatibleExistingState =
+        ImmutableBuildableNodeManagementState.builder()
+            .idGeneratorSpec(
+                ImmutableIdGeneratorSpec.builder()
+                    .type("snowflake")
+                    .params(Map.of("offset", "2024-11-21T12:44:51.134Z"))
+                    .build())
+            .build();
+
+    var config =
+        NodeManagementConfig.BuildableNodeManagementConfig.builder()
+            .idGeneratorSpec(
+                ImmutableIdGeneratorSpec.builder()
+                    .type("snowflake")
+                    .params(
+                        Map.of(
+                            "timestamp-bits",
+                            "1",
+                            "node-id-bits",
+                            "2",
+                            "sequence-bits",
+                            "3",
+                            "offset",
+                            "2024-11-21T12:44:51.134Z"))
+                    .build())
+            .build();
+    var incompatible = new AtomicBoolean(false);
+    try (var mgmt =
+        new NodeManagementImpl(
+            config,
+            clock,
+            new NodeStoreFactory() {
+              @Nonnull
+              @Override
+              public NodeStore createNodeStore(@Nonnull IdGenerator 
idGenerator) {
+                return new MockNodeStore();
+              }
+
+              @Override
+              public Optional<NodeManagementState> fetchManagementState() {
+                return Optional.of(incompatibleExistingState);
+              }
+
+              @Override
+              public boolean storeManagementState(@Nonnull NodeManagementState 
state) {
+                throw new UnsupportedOperationException();
+              }
+            },
+            scheduler) {
+          @Override
+          void warnOnIncompatibleIdGeneratorSpec(IdGeneratorSpec spec) {
+            incompatible.set(true);
+            super.warnOnIncompatibleIdGeneratorSpec(spec);
+          }
+        }) {
+      mgmt.init();
+
+      soft.assertThat(incompatible).isTrue();
+      var node = mgmt.lease();
+      var idGen = mgmt.buildIdGenerator(node);
+      soft.assertThat(idGen)
+          .isInstanceOf(SnowflakeIdGenerator.class)
+          .asInstanceOf(type(SnowflakeIdGenerator.class))
+          .extracting(
+              SnowflakeIdGenerator::timestampBits,
+              SnowflakeIdGenerator::nodeIdBits,
+              SnowflakeIdGenerator::sequenceBits)
+          .containsExactly(
+              SnowflakeIdGenerator.DEFAULT_TIMESTAMP_BITS,
+              SnowflakeIdGenerator.DEFAULT_NODE_ID_BITS,
+              SnowflakeIdGenerator.DEFAULT_SEQUENCE_BITS);
+    }
+  }
+
+  @Test
+  public void simple() {
+    var config =
+        NodeManagementConfig.BuildableNodeManagementConfig.builder()
+            .idGeneratorSpec(
+                ImmutableIdGeneratorSpec.builder()
+                    .type("snowflake")
+                    .params(Map.of("offset", "2024-11-21T12:44:51.134Z"))
+                    .build())
+            .build();
+    try (var mgmt = new NodeManagementImpl(config, clock, new 
MockNodeStoreFactory(), scheduler)) {
+      mgmt.init();
+
+      soft.assertThat(mgmt.maxNumberOfNodes()).isEqualTo(config.numNodes());
+      var lease = mgmt.lease();
+      soft.assertThat(lease).isNotNull();
+      soft.assertThat(lease.nodeIdIfValid()).isNotEqualTo(-1);
+    }
+  }
+
+  @Test
+  public void allocateAll() {
+    NodeManagementImpl m;
+    var config =
+        NodeManagementConfig.BuildableNodeManagementConfig.builder()
+            .idGeneratorSpec(
+                ImmutableIdGeneratorSpec.builder()
+                    .type("snowflake")
+                    .params(Map.of("offset", "2024-11-21T12:44:51.134Z"))
+                    .build())
+            .build();
+    try (var mutableClock = new MutableMonotonicClock();
+        var mgmt =
+            new NodeManagementImpl(config, mutableClock, new 
MockNodeStoreFactory(), scheduler)) {
+      mgmt.init();
+
+      var numNodeIds = 1 << SnowflakeIdGenerator.DEFAULT_NODE_ID_BITS;
+      var leases = new ArrayList<NodeLease>();
+      for (int i = 0; i < numNodeIds; i++) {
+        soft.assertThatCode(() -> leases.add(mgmt.lease()))
+            .describedAs("n = %d", i)
+            .doesNotThrowAnyException();
+      }
+      soft.assertThatIllegalStateException()
+          .isThrownBy(mgmt::lease)
+          .withMessage("Could not lease any node ID");
+
+      soft.assertThat(leases).hasSize(numNodeIds);
+
+      for (var lease : leases) {
+        soft.assertThat(lease.nodeIdIfValid()).isNotEqualTo(-1);
+        soft.assertThat(
+                requireNonNull(lease.node())
+                    .valid(lease.node().expirationTimestamp().toEpochMilli()))
+            .isFalse();
+      }
+
+      // Release all leases
+
+      for (var node : leases) {
+        node.release();
+        soft.assertThat(node.nodeIdIfValid()).isEqualTo(-1);
+      }
+
+      leases.clear();
+
+      // Lease fails, because the clock is not 'after' the 
'expirationTimestamp'
+
+      soft.assertThatIllegalStateException()
+          .isThrownBy(mgmt::lease)
+          .withMessage("Could not lease any node ID");
+
+      // Advance clock
+
+      mutableClock.advanceBoth(Duration.ofSeconds(1));
+
+      // Repeat allocation of all nodes
+
+      for (int i = 0; i < numNodeIds; i++) {
+        soft.assertThatCode(() -> leases.add(mgmt.lease()))
+            .describedAs("n = %d", i)
+            .doesNotThrowAnyException();
+      }
+      soft.assertThatIllegalStateException()
+          .isThrownBy(mgmt::lease)
+          .withMessage("Could not lease any node ID");
+
+      soft.assertThat(leases).hasSize(numNodeIds);
+
+      m = mgmt;
+    }
+
+    soft.assertThatIllegalStateException()
+        .isThrownBy(m::lease)
+        .withMessage("NodeManagement instance has been closed");
+  }
+}
diff --git a/persistence/nosql/nodes/impl/src/test/resources/logback-test.xml 
b/persistence/nosql/nodes/impl/src/test/resources/logback-test.xml
new file mode 100644
index 000000000..4a4d9a629
--- /dev/null
+++ b/persistence/nosql/nodes/impl/src/test/resources/logback-test.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<configuration debug="false">
+  <contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator"/>
+  <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
+    <encoder>
+      <pattern>%date{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern>
+    </encoder>
+  </appender>
+  <root level="${test.log.level:-INFO}">
+    <appender-ref ref="console"/>
+  </root>
+</configuration>
diff --git 
a/persistence/nosql/nodes/impl/src/testFixtures/java/org/apache/polaris/nodeids/impl/MockNodeStore.java
 
b/persistence/nosql/nodes/impl/src/testFixtures/java/org/apache/polaris/nodeids/impl/MockNodeStore.java
new file mode 100644
index 000000000..912069276
--- /dev/null
+++ 
b/persistence/nosql/nodes/impl/src/testFixtures/java/org/apache/polaris/nodeids/impl/MockNodeStore.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.nodeids.impl;
+
+import jakarta.annotation.Nonnull;
+import jakarta.annotation.Nullable;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.polaris.nodeids.spi.NodeState;
+import org.apache.polaris.nodeids.spi.NodeStore;
+
+public class MockNodeStore implements NodeStore {
+  private final Map<Integer, NodeState> nodeStates = new ConcurrentHashMap<>();
+
+  @Nullable
+  @Override
+  public NodeState persist(
+      int nodeId, Optional<NodeState> expectedNodeState, @Nonnull NodeState 
newState) {
+    if (nodeId >= 0 && expectedNodeState.isPresent()) {
+      var result =
+          nodeStates.computeIfPresent(
+              nodeId,
+              (key, existing) -> {
+                if (expectedNodeState.get().equals(existing)) {
+                  return newState;
+                }
+                return existing;
+              });
+      if (result == newState) {
+        return newState;
+      }
+    } else {
+      if (nodeStates.putIfAbsent(nodeId, newState) == null) {
+        return newState;
+      }
+    }
+    return null;
+  }
+
+  @Nonnull
+  @Override
+  public NodeState[] fetchMany(@Nonnull int... nodeIds) {
+    var r = new NodeState[nodeIds.length];
+    for (int i = 0; i < nodeIds.length; i++) {
+      var id = nodeIds[i];
+      if (id >= 0) {
+        r[i] = nodeStates.get(id);
+      }
+    }
+    return r;
+  }
+
+  @Override
+  public Optional<NodeState> fetch(int nodeId) {
+    return Optional.ofNullable(nodeStates.get(nodeId));
+  }
+}
diff --git 
a/persistence/nosql/nodes/impl/src/testFixtures/java/org/apache/polaris/nodeids/impl/MockNodeStoreFactory.java
 
b/persistence/nosql/nodes/impl/src/testFixtures/java/org/apache/polaris/nodeids/impl/MockNodeStoreFactory.java
new file mode 100644
index 000000000..48ce35094
--- /dev/null
+++ 
b/persistence/nosql/nodes/impl/src/testFixtures/java/org/apache/polaris/nodeids/impl/MockNodeStoreFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.nodeids.impl;
+
+import jakarta.annotation.Nonnull;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.polaris.ids.api.IdGenerator;
+import org.apache.polaris.nodeids.spi.NodeManagementState;
+import org.apache.polaris.nodeids.spi.NodeStore;
+import org.apache.polaris.nodeids.spi.NodeStoreFactory;
+
+public class MockNodeStoreFactory implements NodeStoreFactory {
+  private final AtomicReference<NodeManagementState> nodeState = new 
AtomicReference<>();
+  private final NodeStore nodeStore;
+
+  public MockNodeStoreFactory() {
+    this(new MockNodeStore());
+  }
+
+  public MockNodeStoreFactory(NodeStore nodeStore) {
+    this.nodeStore = nodeStore;
+  }
+
+  @Override
+  public boolean storeManagementState(@Nonnull NodeManagementState state) {
+    return nodeState.compareAndSet(null, state);
+  }
+
+  @Override
+  public Optional<NodeManagementState> fetchManagementState() {
+    return Optional.ofNullable(nodeState.get());
+  }
+
+  @Nonnull
+  @Override
+  public NodeStore createNodeStore(@Nonnull IdGenerator idGenerator) {
+    return nodeStore;
+  }
+}
diff --git a/persistence/nosql/nodes/spi/build.gradle.kts 
b/persistence/nosql/nodes/spi/build.gradle.kts
new file mode 100644
index 000000000..6a62d205a
--- /dev/null
+++ b/persistence/nosql/nodes/spi/build.gradle.kts
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+plugins {
+  id("org.kordamp.gradle.jandex")
+  id("polaris-server")
+}
+
+description = "Polaris nodes SPI"
+
+dependencies {
+  implementation(project(":polaris-idgen-api"))
+  implementation(project(":polaris-nodes-api"))
+
+  compileOnly(project(":polaris-immutables"))
+  annotationProcessor(project(":polaris-immutables", configuration = 
"processor"))
+
+  implementation(platform(libs.jackson.bom))
+  implementation("com.fasterxml.jackson.core:jackson-databind")
+
+  compileOnly(libs.jakarta.annotation.api)
+  compileOnly(libs.jakarta.validation.api)
+  compileOnly(libs.jakarta.inject.api)
+  compileOnly(libs.jakarta.enterprise.cdi.api)
+}
diff --git 
a/persistence/nosql/nodes/spi/src/main/java/org/apache/polaris/nodeids/spi/NodeManagementState.java
 
b/persistence/nosql/nodes/spi/src/main/java/org/apache/polaris/nodeids/spi/NodeManagementState.java
new file mode 100644
index 000000000..558b1e1b6
--- /dev/null
+++ 
b/persistence/nosql/nodes/spi/src/main/java/org/apache/polaris/nodeids/spi/NodeManagementState.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.nodeids.spi;
+
+import java.util.Optional;
+import org.apache.polaris.ids.api.IdGeneratorSpec;
+import org.apache.polaris.immutables.PolarisImmutable;
+
+public interface NodeManagementState {
+  Optional<IdGeneratorSpec> idGeneratorSpec();
+
+  @PolarisImmutable
+  interface BuildableNodeManagementState extends NodeManagementState {}
+}
diff --git 
a/persistence/nosql/nodes/spi/src/main/java/org/apache/polaris/nodeids/spi/NodeState.java
 
b/persistence/nosql/nodes/spi/src/main/java/org/apache/polaris/nodeids/spi/NodeState.java
new file mode 100644
index 000000000..a461c39fc
--- /dev/null
+++ 
b/persistence/nosql/nodes/spi/src/main/java/org/apache/polaris/nodeids/spi/NodeState.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.nodeids.spi;
+
+import java.time.Instant;
+import org.apache.polaris.immutables.PolarisImmutable;
+
+@PolarisImmutable
+public interface NodeState {
+  Instant leaseTimestamp();
+
+  /** Timestamp since which this node's lease is no longer valid. */
+  Instant expirationTimestamp();
+}
diff --git 
a/persistence/nosql/nodes/spi/src/main/java/org/apache/polaris/nodeids/spi/NodeStore.java
 
b/persistence/nosql/nodes/spi/src/main/java/org/apache/polaris/nodeids/spi/NodeStore.java
new file mode 100644
index 000000000..8950be1b7
--- /dev/null
+++ 
b/persistence/nosql/nodes/spi/src/main/java/org/apache/polaris/nodeids/spi/NodeStore.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.nodeids.spi;
+
+import jakarta.annotation.Nonnull;
+import jakarta.annotation.Nullable;
+import java.util.Optional;
+
+/** Abstraction of nodes persistence for the node management implementation. */
+public interface NodeStore {
+  Optional<NodeState> fetch(int nodeId);
+
+  @Nonnull
+  NodeState[] fetchMany(@Nonnull int... nodeIds);
+
+  @Nullable
+  NodeState persist(int nodeId, Optional<NodeState> expectedNodeState, 
@Nonnull NodeState newState);
+}
diff --git 
a/persistence/nosql/nodes/spi/src/main/java/org/apache/polaris/nodeids/spi/NodeStoreFactory.java
 
b/persistence/nosql/nodes/spi/src/main/java/org/apache/polaris/nodeids/spi/NodeStoreFactory.java
new file mode 100644
index 000000000..c59db0ed0
--- /dev/null
+++ 
b/persistence/nosql/nodes/spi/src/main/java/org/apache/polaris/nodeids/spi/NodeStoreFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.nodeids.spi;
+
+import jakarta.annotation.Nonnull;
+import java.util.Optional;
+import org.apache.polaris.ids.api.IdGenerator;
+
+public interface NodeStoreFactory {
+  @Nonnull
+  NodeStore createNodeStore(@Nonnull IdGenerator idGenerator);
+
+  Optional<NodeManagementState> fetchManagementState();
+
+  boolean storeManagementState(@Nonnull NodeManagementState state);
+}

Reply via email to