This is an automated email from the ASF dual-hosted git repository.
dimas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new 3dd46b979 NoSQL: Node IDs - API, SPI + general implementation (#2728)
3dd46b979 is described below
commit 3dd46b979e6d96774cf9c35dcf006759c43a2d5f
Author: Robert Stupp <[email protected]>
AuthorDate: Tue Oct 28 23:34:54 2025 +0100
NoSQL: Node IDs - API, SPI + general implementation (#2728)
* NoSQL: Node IDs - API, SPI + general implementation
This PR provides a mechanism to assign a Polaris-cluster-wide unique
node-ID to each Polaris instance, which is then used when generating
Polaris-cluster-wide unique Snowflake-IDs.
The change is fundamental for the NoSQL work, but also demanded for the
existing relational JDBC persistence.
Does not include any persistence specific implementation.
* NoSQL: Fail node-management-impl init after timeout
Also move the expensive part to a `@PostConstruct` to not block CDI
entirely from initializing.
---
bom/build.gradle.kts | 4 +
gradle/projects.main.properties | 4 +
persistence/nosql/nodes/README.md | 34 ++
persistence/nosql/nodes/api/build.gradle.kts | 44 ++
.../java/org/apache/polaris/nodeids/api/Node.java | 48 ++
.../org/apache/polaris/nodeids/api/NodeLease.java | 45 ++
.../apache/polaris/nodeids/api/NodeManagement.java | 72 +++
.../polaris/nodeids/api/NodeManagementConfig.java | 87 +++
persistence/nosql/nodes/impl/build.gradle.kts | 64 +++
.../polaris/nodeids/impl/NodeManagementImpl.java | 608 +++++++++++++++++++++
.../java/org/apache/polaris/nodeids/impl/Util.java | 38 ++
.../apache/polaris/nodeids/impl/package-info.java | 20 +
.../impl/src/main/resources/META-INF/beans.xml | 24 +
.../polaris/nodeids/impl/TestNodeLeases.java | 221 ++++++++
.../nodeids/impl/TestNodeManagementImpl.java | 244 +++++++++
.../nodes/impl/src/test/resources/logback-test.xml | 32 ++
.../apache/polaris/nodeids/impl/MockNodeStore.java | 74 +++
.../polaris/nodeids/impl/MockNodeStoreFactory.java | 56 ++
persistence/nosql/nodes/spi/build.gradle.kts | 41 ++
.../polaris/nodeids/spi/NodeManagementState.java | 30 +
.../org/apache/polaris/nodeids/spi/NodeState.java | 30 +
.../org/apache/polaris/nodeids/spi/NodeStore.java | 34 ++
.../polaris/nodeids/spi/NodeStoreFactory.java | 32 ++
23 files changed, 1886 insertions(+)
diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts
index 90281d65f..c6f0713eb 100644
--- a/bom/build.gradle.kts
+++ b/bom/build.gradle.kts
@@ -44,6 +44,10 @@ dependencies {
api(project(":polaris-idgen-impl"))
api(project(":polaris-idgen-spi"))
+ api(project(":polaris-nodes-api"))
+ api(project(":polaris-nodes-impl"))
+ api(project(":polaris-nodes-spi"))
+
api(project(":polaris-config-docs-annotations"))
api(project(":polaris-config-docs-generator"))
diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties
index f6c46f8be..c77a06fd7 100644
--- a/gradle/projects.main.properties
+++ b/gradle/projects.main.properties
@@ -57,5 +57,9 @@ polaris-idgen-api=persistence/nosql/idgen/api
polaris-idgen-impl=persistence/nosql/idgen/impl
polaris-idgen-mocks=persistence/nosql/idgen/mocks
polaris-idgen-spi=persistence/nosql/idgen/spi
+# nodes
+polaris-nodes-api=persistence/nosql/nodes/api
+polaris-nodes-impl=persistence/nosql/nodes/impl
+polaris-nodes-spi=persistence/nosql/nodes/spi
# persistence / database agnostic
polaris-persistence-nosql-varint=persistence/nosql/persistence/varint
diff --git a/persistence/nosql/nodes/README.md
b/persistence/nosql/nodes/README.md
new file mode 100644
index 000000000..19370b82f
--- /dev/null
+++ b/persistence/nosql/nodes/README.md
@@ -0,0 +1,34 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# Uniquely identify running Polaris nodes
+
+Some ID generation mechanisms,
+like
[Snowflake-IDs](https://medium.com/@jitenderkmr/demystifying-snowflake-ids-a-unique-identifier-in-distributed-computing-72796a827c9d),
+require unique integer IDs for each running node. This framework provides a
mechanism to assign each running node a
+unique integer ID.
+
+## Code structure
+
+The code is structured into multiple modules. Consuming code should almost
always pull in only the API module.
+
+* `polaris-nodes-api` provides the necessary Java interfaces and immutable
types.
+* `polaris-nodes-impl` provides the storage agnostic implementation.
+* `polaris-nodes-spi` provides the necessary interfaces to provide a storage
specific implementation.
+* `polaris-nodes-store-nosql` provides the storage implementation based on
`polaris-persistence-nosql-api`.
diff --git a/persistence/nosql/nodes/api/build.gradle.kts
b/persistence/nosql/nodes/api/build.gradle.kts
new file mode 100644
index 000000000..3374ed48f
--- /dev/null
+++ b/persistence/nosql/nodes/api/build.gradle.kts
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+plugins {
+ id("org.kordamp.gradle.jandex")
+ id("polaris-server")
+}
+
+description = "Polaris nodes API, no concrete implementations"
+
+dependencies {
+ implementation(project(":polaris-idgen-api"))
+
+ compileOnly(project(":polaris-immutables"))
+ annotationProcessor(project(":polaris-immutables", configuration =
"processor"))
+
+ implementation(platform(libs.jackson.bom))
+ implementation("com.fasterxml.jackson.core:jackson-databind")
+
+ compileOnly(libs.jakarta.annotation.api)
+ compileOnly(libs.jakarta.validation.api)
+ compileOnly(libs.jakarta.inject.api)
+ compileOnly(libs.jakarta.enterprise.cdi.api)
+
+ compileOnly(libs.smallrye.config.core)
+ compileOnly(platform(libs.quarkus.bom))
+ compileOnly("io.quarkus:quarkus-core")
+}
diff --git
a/persistence/nosql/nodes/api/src/main/java/org/apache/polaris/nodeids/api/Node.java
b/persistence/nosql/nodes/api/src/main/java/org/apache/polaris/nodeids/api/Node.java
new file mode 100644
index 000000000..4fb7e82bb
--- /dev/null
+++
b/persistence/nosql/nodes/api/src/main/java/org/apache/polaris/nodeids/api/Node.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.nodeids.api;
+
+import jakarta.annotation.Nullable;
+import java.time.Instant;
+import org.apache.polaris.immutables.PolarisImmutable;
+
+/** Represents the local node's ID and informative, mutable state information.
*/
+@PolarisImmutable
+public interface Node {
+ /**
+ * Returns the ID of this node.
+ *
+ * @return ID of this node
+ * @throws IllegalStateException if the lease is no longer valid, for
example, expired before it
+ * being renewed
+ */
+ int id();
+
+ default boolean valid(long nowInMillis) {
+ return nowInMillis < expirationTimestamp().toEpochMilli();
+ }
+
+ Instant leaseTimestamp();
+
+ @Nullable
+ Instant renewLeaseTimestamp();
+
+ /** Timestamp since which this node's lease is no longer valid. */
+ Instant expirationTimestamp();
+}
diff --git
a/persistence/nosql/nodes/api/src/main/java/org/apache/polaris/nodeids/api/NodeLease.java
b/persistence/nosql/nodes/api/src/main/java/org/apache/polaris/nodeids/api/NodeLease.java
new file mode 100644
index 000000000..ee88a389f
--- /dev/null
+++
b/persistence/nosql/nodes/api/src/main/java/org/apache/polaris/nodeids/api/NodeLease.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.nodeids.api;
+
+import jakarta.annotation.Nullable;
+
+public interface NodeLease {
+ /**
+ * Returns the {@link Node} representation for this lease if the lease has
not been released or
+ * {@code null}.
+ */
+ @Nullable
+ Node node();
+
+ /**
+ * Permanently release the lease. Does nothing, if already released. Throws
if persisting the
+ * released state fails.
+ */
+ void release();
+
+ /**
+ * Force a lease renewal, generally not recommended nor necessary. Throws,
if the lease is already
+ * released.
+ */
+ void renew();
+
+ /** Returns the node ID if the lease is active/valid or {@code -1}. */
+ int nodeIdIfValid();
+}
diff --git
a/persistence/nosql/nodes/api/src/main/java/org/apache/polaris/nodeids/api/NodeManagement.java
b/persistence/nosql/nodes/api/src/main/java/org/apache/polaris/nodeids/api/NodeManagement.java
new file mode 100644
index 000000000..dd3cf72b6
--- /dev/null
+++
b/persistence/nosql/nodes/api/src/main/java/org/apache/polaris/nodeids/api/NodeManagement.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.nodeids.api;
+
+import jakarta.annotation.Nonnull;
+import jakarta.enterprise.context.ApplicationScoped;
+import java.util.Optional;
+import org.apache.polaris.ids.api.IdGenerator;
+import org.apache.polaris.ids.api.SnowflakeIdGenerator;
+
+/**
+ * API to lease node IDs, primarily to generate {@linkplain
SnowflakeIdGenerator snowflake IDs}.
+ *
+ * <p>The default configuration for the snowflake IDs allows generation of
4096 IDs per millisecond
+ * (12 sequence bits), which should be more than enough. As a consequence, it
is very likely enough
+ * to have only one ID generator per JVM, across all realms and catalogs.
+ *
+ * <p>Implementation is provided as an {@link ApplicationScoped
@ApplicationScoped} bean
+ */
+public interface NodeManagement extends AutoCloseable {
+ /**
+ * Build a <em>new</em> and <em>independent</em> ID generator instance of a
{@linkplain #lease()
+ * leased node} using the given clock.
+ *
+ * <p>This function must only be called from {@link ApplicationScoped
@ApplicationScoped} CDI
+ * producers providing the same {@link IdGenerator} for the lifetime of the
given {@link Node},
+ * aka at most once for a {@link Node} instance.
+ */
+ IdGenerator buildIdGenerator(@Nonnull NodeLease leasedNode);
+
+ /** The maximum number of concurrently leased nodes that are supported. */
+ int maxNumberOfNodes();
+
+ /** Retrieve information about a specific node. */
+ Optional<Node> getNodeInfo(int nodeId);
+
+ /** Get the persistence ID for a node by its ID. */
+ long systemIdForNode(int nodeId);
+
+ /**
+ * Lease a node.
+ *
+ * <p>The implementation takes care of periodically renewing the lease. It
is not necessary to
+ * explicitly call {@link NodeLease#renew()}.
+ *
+ * <p>It is possible that the {@linkplain NodeLease#nodeIdIfValid() ID of
the leased node} changes
+ * over time.
+ *
+ * <p>Each invocation returns a new, independent lease.
+ *
+ * @return the leased node
+ * @throws IllegalStateException if no node ID could be leased
+ */
+ @Nonnull
+ NodeLease lease();
+}
diff --git
a/persistence/nosql/nodes/api/src/main/java/org/apache/polaris/nodeids/api/NodeManagementConfig.java
b/persistence/nosql/nodes/api/src/main/java/org/apache/polaris/nodeids/api/NodeManagementConfig.java
new file mode 100644
index 000000000..c50aa951b
--- /dev/null
+++
b/persistence/nosql/nodes/api/src/main/java/org/apache/polaris/nodeids/api/NodeManagementConfig.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.nodeids.api;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import io.smallrye.config.ConfigMapping;
+import io.smallrye.config.WithDefault;
+import java.time.Duration;
+import org.apache.polaris.ids.api.IdGenerator;
+import org.apache.polaris.ids.api.IdGeneratorSpec;
+import org.apache.polaris.ids.api.SnowflakeIdGenerator;
+import org.apache.polaris.immutables.PolarisImmutable;
+import org.immutables.value.Value;
+
+/** Node management configuration. */
+@ConfigMapping(prefix = "polaris.node")
+@JsonSerialize(as = ImmutableBuildableNodeManagementConfig.class)
+@JsonDeserialize(as = ImmutableBuildableNodeManagementConfig.class)
+public interface NodeManagementConfig {
+ /** Duration of a node-lease. */
+ @WithDefault(DEFAULT_LEASE_DURATION)
+ Duration leaseDuration();
+
+ /** Time window before the end of a node lease when the lease will be
renewed. */
+ @WithDefault(DEFAULT_RENEWAL_PERIOD)
+ Duration renewalPeriod();
+
+ /**
+ * Maximum number of concurrently active Polaris nodes. Do not change this
value or the ID
+ * generator spec, it is a rather internal property. See ID generator spec
below.
+ */
+ @WithDefault(DEFAULT_NUM_NODES)
+ int numNodes();
+
+ /**
+ * Configuration needed to build an {@linkplain IdGenerator ID generator}.
This configuration
+ * cannot be changed after one Polaris node has been successfully started.
This specification will
+ * be ignored if a persisted one exists, but a warning shall be logged if
both are different.
+ */
+ IdGeneratorSpec idGeneratorSpec();
+
+ String DEFAULT_LEASE_DURATION = "PT1H";
+ String DEFAULT_RENEWAL_PERIOD = "PT15M";
+ String DEFAULT_NUM_NODES = "" + (1 <<
SnowflakeIdGenerator.DEFAULT_NODE_ID_BITS);
+
+ @PolarisImmutable
+ interface BuildableNodeManagementConfig extends NodeManagementConfig {
+
+ static ImmutableBuildableNodeManagementConfig.Builder builder() {
+ return ImmutableBuildableNodeManagementConfig.builder();
+ }
+
+ @Override
+ @Value.Default
+ default Duration leaseDuration() {
+ return Duration.parse(DEFAULT_LEASE_DURATION);
+ }
+
+ @Override
+ @Value.Default
+ default Duration renewalPeriod() {
+ return Duration.parse(DEFAULT_RENEWAL_PERIOD);
+ }
+
+ @Override
+ default int numNodes() {
+ return 1 << SnowflakeIdGenerator.DEFAULT_NODE_ID_BITS;
+ }
+ }
+}
diff --git a/persistence/nosql/nodes/impl/build.gradle.kts
b/persistence/nosql/nodes/impl/build.gradle.kts
new file mode 100644
index 000000000..a76544ffb
--- /dev/null
+++ b/persistence/nosql/nodes/impl/build.gradle.kts
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+plugins {
+ id("org.kordamp.gradle.jandex")
+ id("polaris-server")
+}
+
+description = "Polaris nodes management implementation"
+
+dependencies {
+ implementation(project(":polaris-nodes-api"))
+ implementation(project(":polaris-nodes-spi"))
+ implementation(project(":polaris-idgen-api"))
+ implementation(project(":polaris-idgen-spi"))
+ implementation(project(":polaris-async-api"))
+
+ implementation(libs.guava)
+ implementation(libs.slf4j.api)
+
+ compileOnly(platform(libs.jackson.bom))
+ compileOnly("com.fasterxml.jackson.core:jackson-annotations")
+
+ compileOnly(project(":polaris-immutables"))
+ annotationProcessor(project(":polaris-immutables", configuration =
"processor"))
+
+ compileOnly(libs.jakarta.annotation.api)
+ compileOnly(libs.jakarta.validation.api)
+ compileOnly(libs.jakarta.inject.api)
+ compileOnly(libs.jakarta.enterprise.cdi.api)
+
+ testFixturesApi(project(":polaris-idgen-api"))
+ testFixturesApi(project(":polaris-nodes-api"))
+ testFixturesApi(project(":polaris-nodes-spi"))
+
+ testFixturesRuntimeOnly(libs.smallrye.jandex)
+
+ testFixturesApi(platform(libs.jackson.bom))
+ testFixturesApi("com.fasterxml.jackson.core:jackson-annotations")
+
+ testImplementation(project(":polaris-idgen-mocks"))
+ testImplementation(project(":polaris-async-java"))
+ testImplementation(testFixtures(project(":polaris-async-api")))
+
+ testFixturesApi(libs.jakarta.annotation.api)
+}
+
+tasks.withType<Javadoc> { isFailOnError = false }
diff --git
a/persistence/nosql/nodes/impl/src/main/java/org/apache/polaris/nodeids/impl/NodeManagementImpl.java
b/persistence/nosql/nodes/impl/src/main/java/org/apache/polaris/nodeids/impl/NodeManagementImpl.java
new file mode 100644
index 000000000..50ff3653d
--- /dev/null
+++
b/persistence/nosql/nodes/impl/src/main/java/org/apache/polaris/nodeids/impl/NodeManagementImpl.java
@@ -0,0 +1,608 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.nodeids.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.lang.Integer.bitCount;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
+import jakarta.annotation.Nonnull;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.net.InterfaceAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.IntStream;
+import org.apache.polaris.ids.api.IdGenerator;
+import org.apache.polaris.ids.api.IdGeneratorSpec;
+import org.apache.polaris.ids.api.ImmutableIdGeneratorSpec;
+import org.apache.polaris.ids.api.MonotonicClock;
+import org.apache.polaris.ids.spi.IdGeneratorFactory;
+import org.apache.polaris.ids.spi.IdGeneratorSource;
+import org.apache.polaris.nodeids.api.ImmutableNode;
+import org.apache.polaris.nodeids.api.Node;
+import org.apache.polaris.nodeids.api.NodeLease;
+import org.apache.polaris.nodeids.api.NodeManagement;
+import org.apache.polaris.nodeids.api.NodeManagementConfig;
+import org.apache.polaris.nodeids.spi.ImmutableBuildableNodeManagementState;
+import org.apache.polaris.nodeids.spi.ImmutableNodeState;
+import org.apache.polaris.nodeids.spi.NodeState;
+import org.apache.polaris.nodeids.spi.NodeStore;
+import org.apache.polaris.nodeids.spi.NodeStoreFactory;
+import org.apache.polaris.nosql.async.AsyncExec;
+import org.apache.polaris.nosql.async.Cancelable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@ApplicationScoped
+class NodeManagementImpl implements NodeManagement {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(NodeManagementImpl.class);
+
+ // should be a power of 2
+ private static final int CHECK_BATCH_SIZE = 16;
+ static final Duration RESCHEDULE_AFTER_FAILURE = Duration.ofSeconds(10);
+ static final Duration RESCHEDULE_UNTIL_EXPIRATION = Duration.ofMinutes(1);
+ static final Duration RENEWAL_MIN_LEFT_FOR_RENEWAL = Duration.ofSeconds(30);
+ private final NodeManagementConfig config;
+ private final MonotonicClock clock;
+ private final int numNodeIds;
+ private final Set<NodeLeaseImpl> registeredLeases =
ConcurrentHashMap.newKeySet();
+ private final AsyncExec scheduler;
+ private final NodeStoreFactory nodeStoreFactory;
+
+ private NodeStore nodeStore;
+ private IdGeneratorFactory<?> idGenFactory;
+ private IdGeneratorSpec idGenSpec;
+ private IdGenerator systemIdGen;
+
+ private volatile boolean closed;
+
+ @SuppressWarnings("CdiInjectionPointsInspection")
+ @Inject
+ NodeManagementImpl(
+ NodeManagementConfig config,
+ MonotonicClock clock,
+ NodeStoreFactory nodeStoreFactory,
+ AsyncExec scheduler) {
+ var activePeriod = config.leaseDuration().minus(config.renewalPeriod());
+ this.nodeStoreFactory = nodeStoreFactory;
+ this.numNodeIds = config.numNodes();
+ checkArgs(
+ () ->
+ checkArgument(
+ config.leaseDuration().compareTo(Duration.ofMinutes(15)) > 0,
+ "leaseDuration must be at least 15 minutes"),
+ () ->
+ checkArgument(
+ activePeriod.isPositive(), "leaseDuration must be greater than
renewalPeriod"),
+ () ->
+ checkArgument(
+ activePeriod.compareTo(Duration.ofMinutes(15)) > 0,
+ "active period (leaseDuration - renewalPeriod) must be at
least 15 minutes"),
+ () ->
+ checkArgument(
+ numNodeIds >= CHECK_BATCH_SIZE && bitCount(numNodeIds) == 1,
+ "numNodeIds %s must not be smaller than %s and a power of 2",
+ numNodeIds,
+ CHECK_BATCH_SIZE));
+ this.config = config;
+ this.clock = clock;
+ this.scheduler = scheduler;
+ }
+
+ @SuppressWarnings("BusyWait")
+ @PostConstruct
+ void init() {
+ var idGenSpec =
+ (IdGeneratorSpec)
ImmutableIdGeneratorSpec.builder().from(config.idGeneratorSpec()).build();
+ var validationIdGeneratorSource =
+ new IdGeneratorSource() {
+ @Override
+ public long currentTimeMillis() {
+ return clock.currentTimeMillis();
+ }
+
+ @Override
+ public int nodeId() {
+ return 0;
+ }
+ };
+
+ // If this loop doesn't complete within 10 minutes, we can only give up.
+ var timeout = clock.currentInstant().plus(Duration.ofMinutes(10));
+ while (true) {
+ var existingNodeManagementState =
nodeStoreFactory.fetchManagementState();
+ if (existingNodeManagementState.isPresent()) {
+ var spec =
Util.idgenSpecFromManagementState(existingNodeManagementState);
+ if (!idGenSpec.equals(spec)) {
+ warnOnIncompatibleIdGeneratorSpec(spec);
+ idGenSpec = spec;
+ }
+ var factory = IdGeneratorFactory.lookupFactory(idGenSpec.type());
+ // try to build an ID generator instance to validate the spec
+ factory.validateParameters(idGenSpec.params(),
validationIdGeneratorSource);
+ idGenFactory = factory;
+ break;
+ } else {
+ var factory = IdGeneratorFactory.lookupFactory(idGenSpec.type());
+ // try to build an ID generator instance to validate the spec
+ factory.validateParameters(idGenSpec.params(),
validationIdGeneratorSource);
+
+ if (nodeStoreFactory.storeManagementState(
+
ImmutableBuildableNodeManagementState.builder().idGeneratorSpec(idGenSpec).build()))
{
+ LOGGER.info("Persisted node management configuration.");
+ idGenFactory = factory;
+ break;
+ }
+ }
+ if (timeout.isBefore(clock.currentInstant())) {
+ throw new IllegalStateException(
+ "Timed out to fetch and/or persist node management configuration.
This is likely due to an overloaded backend database.");
+ }
+ try {
+ // random sleep
+ Thread.sleep(ThreadLocalRandom.current().nextInt(10, 500));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(
+ "Interrupted while waiting for node management configuration to be
fetched/persisted",
+ e);
+ }
+ }
+
+ this.idGenSpec = idGenSpec;
+
+ var nodeIdGen = idGenFactory.buildSystemIdGenerator(idGenSpec.params());
+ this.systemIdGen = nodeIdGen;
+ this.nodeStore = nodeStoreFactory.createNodeStore(nodeIdGen);
+ }
+
+ static void checkArgs(Runnable... checks) {
+ var violations = new ArrayList<String>();
+ for (Runnable check : checks) {
+ try {
+ check.run();
+ } catch (IllegalArgumentException iae) {
+ violations.add(iae.getMessage());
+ }
+ }
+ if (!violations.isEmpty()) {
+ throw new IllegalArgumentException(String.join(", ", violations));
+ }
+ }
+
+ @Override
+ public long systemIdForNode(int nodeId) {
+ return systemIdGen.systemIdForNode(nodeId);
+ }
+
+ @Override
+ public IdGenerator buildIdGenerator(@Nonnull NodeLease leasedNode) {
+ var idGenSource =
+ new IdGeneratorSource() {
+ @Override
+ public long currentTimeMillis() {
+ return clock.currentTimeMillis();
+ }
+
+ @Override
+ public int nodeId() {
+ return leasedNode.nodeIdIfValid();
+ }
+ };
+ return idGenFactory.buildIdGenerator(idGenSpec.params(), idGenSource);
+ }
+
+ @VisibleForTesting
+ void warnOnIncompatibleIdGeneratorSpec(IdGeneratorSpec spec) {
+ LOGGER.warn(
+ "Provided ID generator specification does not match the persisted,
unmodifiable one: provided: {} - persisted: {}",
+ idGenSpec,
+ spec);
+ }
+
+ @Override
+ public int maxNumberOfNodes() {
+ return numNodeIds;
+ }
+
+ @Override
+ public Optional<Node> getNodeInfo(int nodeId) {
+ checkArgument(nodeId >= 0 && nodeId < numNodeIds, "Illegal node ID " +
nodeId);
+ var leased =
+ registeredLeases.stream()
+ .map(NodeLeaseImpl::node)
+ .filter(Objects::nonNull)
+ .filter(n -> n.id() == nodeId)
+ .findFirst();
+ if (leased.isPresent()) {
+ return leased;
+ }
+
+ return nodeStore
+ .fetch(nodeId)
+ .map(
+ nodeObj ->
+ ImmutableNode.builder()
+ .id(nodeId)
+ .leaseTimestamp(nodeObj.leaseTimestamp())
+ .expirationTimestamp(nodeObj.expirationTimestamp())
+ .build());
+ }
+
+ @Override
+ @Nonnull
+ public NodeLease lease() {
+ var leaseParams = leaseInternal();
+ var lease = new NodeLeaseImpl(leaseParams);
+ registeredLeases.add(lease);
+ return lease;
+ }
+
+ private LeaseParams leaseInternal() {
+ LOGGER.debug("Leasing a node ID...");
+
+ checkState(!closed, "NodeManagement instance has been closed");
+
+ var now = clock.currentInstant();
+
+ // First, try with a hash of the local network address.
+ // The node ID in this attempt is somewhat deterministic for the local
machine.
+ // This is not really necessary, but it can reduce the overall number of
ever-allocated node
+ // IDs.
+ try {
+ var hashOverNetworkInterfaces =
+ NetworkInterface.networkInterfaces()
+ .mapToInt(
+ ni -> {
+ try {
+ if (ni.isUp()
+ && !ni.isLoopback()
+ && !ni.isVirtual()
+ && !ni.isPointToPoint()) {
+ return ni.getInterfaceAddresses().stream()
+ .mapToInt(InterfaceAddress::hashCode)
+ .reduce((a, b) -> 31 * a + b)
+ .orElse(0);
+ }
+ } catch (SocketException e) {
+ // ignore
+ }
+ return 0;
+ })
+ .reduce((a, b) -> 31 * a + b)
+ .orElse(0);
+
+ var nodeId = hashOverNetworkInterfaces & (numNodeIds - 1);
+
+ var leased = tryLeaseFromCandidates(new int[] {nodeId}, now);
+ if (leased != null) {
+ return leased;
+ }
+ } catch (SocketException e) {
+ // ignore
+ }
+
+ // If the lease-attempt using the hash over network-interfaced did not
succeed, try with
+ // randomly picked node IDs.
+
+ var nodeIdsToCheck = IntStream.range(0, numNodeIds).toArray();
+ Ints.rotate(nodeIdsToCheck,
ThreadLocalRandom.current().nextInt(numNodeIds));
+
+ for (int i = 0; i < numNodeIds; i += CHECK_BATCH_SIZE) {
+ var ids = Arrays.copyOfRange(nodeIdsToCheck, i, i + CHECK_BATCH_SIZE);
+ var leased = tryLeaseFromCandidates(ids, now);
+ if (leased != null) {
+ return leased;
+ }
+ }
+
+ // No approach worked, give up - boom!
+ throw new IllegalStateException("Could not lease any node ID");
+ }
+
+ @Override
+ @PreDestroy
+ public void close() {
+ LOGGER.debug("Closing NodeManagement");
+ closed = true;
+ RuntimeException ex = null;
+ // Iterating over a snapshot of registeredLeases, as
NodeLeaseImpl.release() modifies it.
+ // Note: NodeLeaseImpl.release() checks against 'registeredLeases'.
+ for (var leased : new ArrayList<>(registeredLeases)) {
+ try {
+ leased.release();
+ } catch (RuntimeException e) {
+ if (ex == null) {
+ ex = e;
+ } else {
+ ex.addSuppressed(e);
+ }
+ }
+ }
+ if (ex != null) {
+ throw ex;
+ }
+ }
+
+ private LeaseParams tryLeaseFromCandidates(int[] nodeIds, Instant now) {
+ var nodesFetched = nodeStore.fetchMany(nodeIds);
+ for (int i = 0; i < nodeIds.length; i++) {
+ // NodeStore.fetchMany MUST return as many elements as the input array.
+ var nodeId = nodeIds[i];
+ var node = nodesFetched[i];
+ if (canLease(node, now)) {
+ var leased = tryLease(nodeId, node);
+ if (leased != null) {
+ return leased;
+ }
+ }
+ }
+ return null;
+ }
+
+ private boolean canLease(NodeState node, Instant now) {
+ if (node == null) {
+ return true;
+ }
+
+ return node.expirationTimestamp().compareTo(now) < 0;
+ }
+
+ private LeaseParams tryLease(int nodeId, NodeState node) {
+ var now = clock.currentInstant();
+ var expire = now.plus(config.leaseDuration());
+
+ LOGGER.debug("Try lease node ID {} ...", nodeId);
+
+ var newNode =
+
ImmutableNodeState.builder().leaseTimestamp(now).expirationTimestamp(expire).build();
+ var persisted = nodeStore.persist(nodeId, Optional.ofNullable(node),
newNode);
+ if (persisted == null) {
+ return null;
+ }
+ checkState(
+ persisted.equals(newNode),
+ "Internal error: value of persisted %s != to-persist %s",
+ persisted,
+ newNode);
+
+ // conditional insert/update succeeded - got the lease!
+
+ var renewal =
persisted.expirationTimestamp().minus(config.renewalPeriod());
+ return new LeaseParams(nodeId, persisted, renewal);
+ }
+
+ record LeaseParams(
+ int nodeId,
+ NodeState nodeState,
+ Instant renewLeaseTimestamp,
+ long expirationTimestampMillis) {
+ LeaseParams(int nodeId, NodeState nodeState, Instant renewLeaseTimestamp) {
+ this(nodeId, nodeState, renewLeaseTimestamp,
nodeState.expirationTimestamp().toEpochMilli());
+ }
+ }
+
+ class NodeLeaseImpl implements NodeLease {
+ private volatile LeaseParams leaseParams;
+ private volatile Cancelable<Void> renewFuture;
+ private final Lock lock = new ReentrantLock();
+
+ NodeLeaseImpl(LeaseParams leaseParams) {
+ newLease(leaseParams);
+ }
+
+ private void newLease(LeaseParams leaseParams) {
+ this.leaseParams = leaseParams;
+ var renewalIn =
+ Duration.ofMillis(
+ leaseParams.renewLeaseTimestamp.toEpochMilli() -
clock.currentTimeMillis());
+ reschedule(renewalIn);
+ LOGGER.info(
+ "New lease for node#{} acquired, valid until {} (renewal scheduled
in {}).",
+ leaseParams.nodeId,
+ leaseParams.nodeState.expirationTimestamp(),
+ renewalIn.truncatedTo(ChronoUnit.SECONDS));
+ }
+
+ private void reschedule(Duration delay) {
+ // 'delay' can safely be negative, which is handled by the scheduler.
+ this.renewFuture = scheduler.schedule(this::renewAndReschedule, delay);
+ }
+
+ private void renewAndReschedule() {
+ if (closed) {
+ return;
+ }
+ lock.lock();
+ try {
+ var lp = leaseParams;
+ if (lp == null) {
+ return;
+ }
+ var id = lp.nodeId;
+ var validFor = Duration.ofMillis(lp.expirationTimestampMillis -
clock.currentTimeMillis());
+ if (validFor.compareTo(RENEWAL_MIN_LEFT_FOR_RENEWAL) > 0) {
+ LOGGER.debug("Renewing lease for node#{}...", id);
+ try {
+ // The call to 'renewLease()` can throw an ISE if this call races
w/
+ // NodeManagement.close().
+ // In that case, it's not just an annoyance, nothing more.
+ var newRenewalTimestamp = renewInternal(lp);
+ LOGGER.info("Lease for node#{} renewed until {}.", id,
newRenewalTimestamp);
+ reschedule(
+ Duration.ofMillis(newRenewalTimestamp.toEpochMilli() -
clock.currentTimeMillis()));
+ return;
+ } catch (Exception e) {
+ if (validFor.compareTo(RESCHEDULE_UNTIL_EXPIRATION) > 0) {
+ LOGGER.warn(
+ "Could not renew lease for node#{} (lease still valid for
{}), retrying in 10 seconds",
+ id,
+ validFor,
+ e);
+ reschedule(RESCHEDULE_AFTER_FAILURE);
+ return;
+ } else {
+ LOGGER.warn(
+ "Could not renew lease for node#{} (lease still valid for
{}), attempting to get a new lease ...",
+ id,
+ validFor,
+ e);
+ }
+ }
+ } else {
+ LOGGER.warn(
+ "Not renewing node lease for node#{}, because the lease is
currently no longer valid. "
+ + "The node lease should have been valid, but was not. This
may happen if the node was massively overloaded.",
+ id);
+ }
+
+ try {
+ var newLease = leaseInternal();
+ newLease(newLease);
+ } catch (Exception e) {
+ LOGGER.warn("Could not get a new lease, reattempting in 10 seconds",
e);
+ reschedule(RESCHEDULE_AFTER_FAILURE);
+ }
+
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public int nodeIdIfValid() {
+ var lp = leaseParams;
+ if (lp == null || lp.expirationTimestampMillis <=
clock.currentTimeMillis()) {
+ return -1;
+ }
+ return lp.nodeId;
+ }
+
+ @Override
+ public Node node() {
+ var lp = leaseParams;
+ return lp != null
+ ? ImmutableNode.builder()
+ .id(lp.nodeId)
+ .expirationTimestamp(lp.nodeState.expirationTimestamp())
+ .leaseTimestamp(lp.nodeState.leaseTimestamp())
+ .renewLeaseTimestamp(lp.renewLeaseTimestamp)
+ .build()
+ : null;
+ }
+
+ @Override
+ public void release() {
+ lock.lock();
+ try {
+ if (!registeredLeases.remove(this)) {
+ return;
+ }
+
+ // Shall never be null, but be safe here.
+ if (renewFuture != null) {
+ renewFuture.cancel();
+ }
+
+ var lp = leaseParams;
+ if (lp == null) {
+ return;
+ }
+ var id = lp.nodeId;
+ LOGGER.info("Releasing lease for node id {}", id);
+
+ var now = clock.currentInstant();
+ var activeState = lp.nodeState;
+ var nodeAsReleased =
+
ImmutableNodeState.builder().from(activeState).expirationTimestamp(now).build();
+
+ leaseParams = null;
+
+ var updated = nodeStore.persist(id, Optional.of(activeState),
nodeAsReleased);
+ checkState(
+ updated != null && updated.equals(nodeAsReleased),
+ "State of the node %s has been unexpectedly changed: value of
persisted %s != to-persist %s",
+ id,
+ updated,
+ nodeAsReleased);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void renew() {
+ lock.lock();
+ try {
+ var lp = leaseParams;
+ checkState(lp != null, "Lease has already been released");
+ renewInternal(lp);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private Instant renewInternal(LeaseParams lp) {
+ var id = lp.nodeId;
+ LOGGER.debug("Renewing lease for node id {}", id);
+
+ var now = clock.currentInstant();
+ var expire = now.plus(config.leaseDuration());
+ var renewal = expire.minus(config.renewalPeriod());
+
+ var newNode =
+ ImmutableNodeState.builder()
+ .leaseTimestamp(lp.nodeState.leaseTimestamp())
+ .expirationTimestamp(expire)
+ .build();
+
+ var persisted = nodeStore.persist(id, Optional.of(lp.nodeState),
newNode);
+ checkState(
+ persisted != null,
+ "State of the NodeState for nodeId %s has been unexpectedly
changed!",
+ id);
+ checkState(
+ persisted.equals(newNode),
+ "Internal error: value of persisted %s != to-persist %s",
+ persisted,
+ newNode);
+
+ leaseParams = new LeaseParams(id, persisted, renewal);
+
+ return renewal;
+ }
+ }
+}
diff --git
a/persistence/nosql/nodes/impl/src/main/java/org/apache/polaris/nodeids/impl/Util.java
b/persistence/nosql/nodes/impl/src/main/java/org/apache/polaris/nodeids/impl/Util.java
new file mode 100644
index 000000000..eaf24887e
--- /dev/null
+++
b/persistence/nosql/nodes/impl/src/main/java/org/apache/polaris/nodeids/impl/Util.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.nodeids.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Optional;
+import org.apache.polaris.ids.api.IdGeneratorSpec;
+import org.apache.polaris.ids.api.ImmutableBuildableIdGeneratorSpec;
+import org.apache.polaris.nodeids.spi.NodeManagementState;
+
+public final class Util {
+ private Util() {}
+
+ @VisibleForTesting
+ public static IdGeneratorSpec idgenSpecFromManagementState(
+ Optional<NodeManagementState> existingNodeManagementState) {
+ return existingNodeManagementState
+ .orElseThrow()
+ .idGeneratorSpec()
+ .orElse(ImmutableBuildableIdGeneratorSpec.builder().build());
+ }
+}
diff --git
a/persistence/nosql/nodes/impl/src/main/java/org/apache/polaris/nodeids/impl/package-info.java
b/persistence/nosql/nodes/impl/src/main/java/org/apache/polaris/nodeids/impl/package-info.java
new file mode 100644
index 000000000..549d09dc5
--- /dev/null
+++
b/persistence/nosql/nodes/impl/src/main/java/org/apache/polaris/nodeids/impl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/** Node management implementation, do not directly use the types in this
package. */
+package org.apache.polaris.nodeids.impl;
diff --git a/persistence/nosql/nodes/impl/src/main/resources/META-INF/beans.xml
b/persistence/nosql/nodes/impl/src/main/resources/META-INF/beans.xml
new file mode 100644
index 000000000..a297f1aa5
--- /dev/null
+++ b/persistence/nosql/nodes/impl/src/main/resources/META-INF/beans.xml
@@ -0,0 +1,24 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<beans xmlns="https://jakarta.ee/xml/ns/jakartaee"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="https://jakarta.ee/xml/ns/jakartaee
https://jakarta.ee/xml/ns/jakartaee/beans_4_0.xsd">
+ <!-- File required by Weld (used for testing), not by Quarkus -->
+</beans>
\ No newline at end of file
diff --git
a/persistence/nosql/nodes/impl/src/test/java/org/apache/polaris/nodeids/impl/TestNodeLeases.java
b/persistence/nosql/nodes/impl/src/test/java/org/apache/polaris/nodeids/impl/TestNodeLeases.java
new file mode 100644
index 000000000..ae79f8914
--- /dev/null
+++
b/persistence/nosql/nodes/impl/src/test/java/org/apache/polaris/nodeids/impl/TestNodeLeases.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.nodeids.impl;
+
+import static java.time.temporal.ChronoUnit.MILLIS;
+import static java.util.Objects.requireNonNull;
+import static
org.apache.polaris.nodeids.impl.NodeManagementImpl.RESCHEDULE_AFTER_FAILURE;
+import static org.assertj.core.api.InstanceOfAssertFactories.BOOLEAN;
+
+import jakarta.annotation.Nonnull;
+import jakarta.annotation.Nullable;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.polaris.ids.api.IdGeneratorSpec;
+import org.apache.polaris.ids.mocks.MutableMonotonicClock;
+import org.apache.polaris.nodeids.api.NodeManagementConfig;
+import org.apache.polaris.nodeids.spi.NodeState;
+import org.apache.polaris.nosql.async.MockAsyncExec;
+import org.assertj.core.api.SoftAssertions;
+import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
+import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(SoftAssertionsExtension.class)
+public class TestNodeLeases {
+ @InjectSoftAssertions protected SoftAssertions soft;
+
+ MockAsyncExec scheduler;
+ MutableMonotonicClock clock;
+
+ @BeforeEach
+ void beforeEach() {
+ clock = new MutableMonotonicClock();
+ scheduler = new MockAsyncExec(clock);
+ }
+
+ @Test
+ public void leaseAgainAfterStalledExecutor() {
+ var config =
+ NodeManagementConfig.BuildableNodeManagementConfig.builder()
+
.idGeneratorSpec(IdGeneratorSpec.BuildableIdGeneratorSpec.builder().build())
+ .build();
+ var renewInterval = config.leaseDuration().minus(config.renewalPeriod());
+ try (var mgmt = new NodeManagementImpl(config, clock, new
MockNodeStoreFactory(), scheduler)) {
+ mgmt.init();
+
+ soft.assertThat(scheduler.tasks()).isEmpty();
+
+ var lease = mgmt.lease();
+ var node = requireNonNull(lease.node());
+ soft.assertThat(lease.nodeIdIfValid()).isNotEqualTo(-1);
+
+ soft.assertThat(scheduler.tasks())
+ .hasSize(1)
+ .first()
+ .extracting(MockAsyncExec.Task::nextExecution)
+ .isEqualTo(clock.currentInstant().plus(renewInterval));
+ soft.assertThat(lease.nodeIdIfValid()).isNotEqualTo(-1);
+
+ soft.assertThat(scheduler.nextReady()).isEmpty();
+
+ clock.advanceBoth(renewInterval);
+ soft.assertThat(scheduler.nextReady())
+ .map(MockAsyncExec.Task::call)
+ .get()
+ .extracting(MockAsyncExec.CallResult::called, BOOLEAN)
+ .isTrue();
+
+ soft.assertThat(
+
scheduler.readyCount(clock.currentInstant().plus(renewInterval).minus(1,
MILLIS)))
+ .isEqualTo(0L);
+
soft.assertThat(scheduler.readyCount(clock.currentInstant().plus(renewInterval)))
+ .isEqualTo(1L);
+
+ soft.assertThat(requireNonNull(lease.node()).id()).isEqualTo(node.id());
+ soft.assertThat(requireNonNull(lease.node()).expirationTimestamp())
+ .isEqualTo(node.expirationTimestamp().plus(renewInterval));
+
+ // Advance the clock _after/at_ expiration so it the node-lease has to
re-lease
+
+ clock.advanceBoth(config.leaseDuration());
+ soft.assertThat(lease.nodeIdIfValid()).isEqualTo(-1);
+
+ soft.assertThat(scheduler.nextReady())
+ .map(MockAsyncExec.Task::call)
+ .get()
+ .extracting(MockAsyncExec.CallResult::called, BOOLEAN)
+ .isTrue();
+
+ soft.assertThat(lease.nodeIdIfValid()).isNotEqualTo(-1);
+
soft.assertThat(requireNonNull(lease.node()).id()).isNotEqualTo(node.id());
+ soft.assertThat(requireNonNull(lease.node()).expirationTimestamp())
+ .isEqualTo(clock.currentInstant().plus(config.leaseDuration()));
+ }
+ soft.assertThat(scheduler.tasks()).isEmpty();
+ }
+
+ @Test
+ public void rescheduleWorksAfterPersistFailures() {
+ var config =
+ NodeManagementConfig.BuildableNodeManagementConfig.builder()
+
.idGeneratorSpec(IdGeneratorSpec.BuildableIdGeneratorSpec.builder().build())
+ .build();
+ var renewInterval = config.leaseDuration().minus(config.renewalPeriod());
+ var persistFailure = new AtomicBoolean(false);
+ var mockStore =
+ new MockNodeStore() {
+ @Nullable
+ @Override
+ public NodeState persist(
+ int nodeId, Optional<NodeState> expectedNodeState, @Nonnull
NodeState newState) {
+ if (persistFailure.compareAndSet(true, false)) {
+ throw new RuntimeException("forced persist failure");
+ }
+ return super.persist(nodeId, expectedNodeState, newState);
+ }
+ };
+ try (var mgmt =
+ new NodeManagementImpl(config, clock, new
MockNodeStoreFactory(mockStore), scheduler)) {
+ mgmt.init();
+
+ soft.assertThat(scheduler.tasks()).isEmpty();
+
+ var lease = mgmt.lease();
+ var node = requireNonNull(lease.node());
+
+ soft.assertThat(scheduler.tasks())
+ .hasSize(1)
+ .first()
+ .extracting(MockAsyncExec.Task::nextExecution)
+ .isEqualTo(clock.currentInstant().plus(renewInterval));
+
+ soft.assertThat(scheduler.nextReady()).isEmpty();
+
+ clock.advanceBoth(renewInterval);
+
+ // Force a failure during renewal
+ persistFailure.set(true);
+
+ soft.assertThat(scheduler.nextReady())
+ .map(MockAsyncExec.Task::call)
+ .get()
+ .extracting(MockAsyncExec.CallResult::called,
MockAsyncExec.CallResult::failure)
+ .containsExactly(true, null);
+
+ soft.assertThat(lease.node()).isEqualTo(node);
+
+ soft.assertThat(
+ scheduler.readyCount(
+
clock.currentInstant().plus(RESCHEDULE_AFTER_FAILURE).minus(1, MILLIS)))
+ .isEqualTo(0L);
+
soft.assertThat(scheduler.readyCount(clock.currentInstant().plus(RESCHEDULE_AFTER_FAILURE)))
+ .isEqualTo(1L);
+
+ clock.advanceBoth(RESCHEDULE_AFTER_FAILURE);
+
+ // renewal working
+
+ soft.assertThat(scheduler.nextReady())
+ .map(MockAsyncExec.Task::call)
+ .get()
+ .extracting(MockAsyncExec.CallResult::called,
MockAsyncExec.CallResult::failure)
+ .containsExactly(true, null);
+
+ soft.assertThat(requireNonNull(lease.node()).id()).isEqualTo(node.id());
+ soft.assertThat(requireNonNull(lease.node()).expirationTimestamp())
+
.isEqualTo(node.expirationTimestamp().plus(RESCHEDULE_AFTER_FAILURE).plus(renewInterval));
+
+ node = lease.node();
+
+ // simulate a persistence failure during re-lease operation
+
+ clock.advanceBoth(config.leaseDuration());
+ persistFailure.set(true);
+ soft.assertThat(scheduler.nextReady())
+ .map(MockAsyncExec.Task::call)
+ .get()
+ .extracting(MockAsyncExec.CallResult::called,
MockAsyncExec.CallResult::failure)
+ .containsExactly(true, null);
+ soft.assertThat(lease.nodeIdIfValid()).isEqualTo(-1);
+ soft.assertThat(lease.node()).isEqualTo(node);
+
+ clock.advanceBoth(RESCHEDULE_AFTER_FAILURE);
+
+ soft.assertThat(scheduler.nextReady())
+ .map(MockAsyncExec.Task::call)
+ .get()
+ .extracting(MockAsyncExec.CallResult::called,
MockAsyncExec.CallResult::failure)
+ .containsExactly(true, null);
+
+ soft.assertThat(lease.nodeIdIfValid()).isNotEqualTo(-1);
+ soft.assertThat(requireNonNull(lease.node()).expirationTimestamp())
+ .isEqualTo(clock.currentInstant().plus(config.leaseDuration()));
+
+ soft.assertThat(scheduler.nextReady()).isEmpty();
+
+ lease.release();
+
+ soft.assertThat(scheduler.tasks()).isEmpty();
+ }
+ soft.assertThat(scheduler.tasks()).isEmpty();
+ }
+}
diff --git
a/persistence/nosql/nodes/impl/src/test/java/org/apache/polaris/nodeids/impl/TestNodeManagementImpl.java
b/persistence/nosql/nodes/impl/src/test/java/org/apache/polaris/nodeids/impl/TestNodeManagementImpl.java
new file mode 100644
index 000000000..2433d412d
--- /dev/null
+++
b/persistence/nosql/nodes/impl/src/test/java/org/apache/polaris/nodeids/impl/TestNodeManagementImpl.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.nodeids.impl;
+
+import static java.util.Objects.requireNonNull;
+import static org.assertj.core.api.InstanceOfAssertFactories.type;
+
+import jakarta.annotation.Nonnull;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.polaris.ids.api.IdGenerator;
+import org.apache.polaris.ids.api.IdGeneratorSpec;
+import org.apache.polaris.ids.api.ImmutableIdGeneratorSpec;
+import org.apache.polaris.ids.api.MonotonicClock;
+import org.apache.polaris.ids.api.SnowflakeIdGenerator;
+import org.apache.polaris.ids.impl.MonotonicClockImpl;
+import org.apache.polaris.ids.mocks.MutableMonotonicClock;
+import org.apache.polaris.nodeids.api.NodeLease;
+import org.apache.polaris.nodeids.api.NodeManagementConfig;
+import org.apache.polaris.nodeids.spi.ImmutableBuildableNodeManagementState;
+import org.apache.polaris.nodeids.spi.NodeManagementState;
+import org.apache.polaris.nodeids.spi.NodeStore;
+import org.apache.polaris.nodeids.spi.NodeStoreFactory;
+import org.apache.polaris.nosql.async.AsyncExec;
+import org.apache.polaris.nosql.async.java.JavaPoolAsyncExec;
+import org.assertj.core.api.SoftAssertions;
+import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
+import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(SoftAssertionsExtension.class)
+public class TestNodeManagementImpl {
+ @InjectSoftAssertions protected SoftAssertions soft;
+
+ AsyncExec scheduler;
+ MonotonicClock clock;
+
+ @BeforeEach
+ void beforeEach() {
+ clock = MonotonicClockImpl.newDefaultInstance();
+ scheduler = new JavaPoolAsyncExec();
+ }
+
+ @AfterEach
+ void afterEach() throws Exception {
+ ((AutoCloseable) scheduler).close();
+ clock.close();
+ }
+
+ @Test
+ public void incompatibleNodeManagementConfig() {
+ var incompatibleExistingState =
+ ImmutableBuildableNodeManagementState.builder()
+ .idGeneratorSpec(
+ ImmutableIdGeneratorSpec.builder()
+ .type("snowflake")
+ .params(Map.of("offset", "2024-11-21T12:44:51.134Z"))
+ .build())
+ .build();
+
+ var config =
+ NodeManagementConfig.BuildableNodeManagementConfig.builder()
+ .idGeneratorSpec(
+ ImmutableIdGeneratorSpec.builder()
+ .type("snowflake")
+ .params(
+ Map.of(
+ "timestamp-bits",
+ "1",
+ "node-id-bits",
+ "2",
+ "sequence-bits",
+ "3",
+ "offset",
+ "2024-11-21T12:44:51.134Z"))
+ .build())
+ .build();
+ var incompatible = new AtomicBoolean(false);
+ try (var mgmt =
+ new NodeManagementImpl(
+ config,
+ clock,
+ new NodeStoreFactory() {
+ @Nonnull
+ @Override
+ public NodeStore createNodeStore(@Nonnull IdGenerator
idGenerator) {
+ return new MockNodeStore();
+ }
+
+ @Override
+ public Optional<NodeManagementState> fetchManagementState() {
+ return Optional.of(incompatibleExistingState);
+ }
+
+ @Override
+ public boolean storeManagementState(@Nonnull NodeManagementState
state) {
+ throw new UnsupportedOperationException();
+ }
+ },
+ scheduler) {
+ @Override
+ void warnOnIncompatibleIdGeneratorSpec(IdGeneratorSpec spec) {
+ incompatible.set(true);
+ super.warnOnIncompatibleIdGeneratorSpec(spec);
+ }
+ }) {
+ mgmt.init();
+
+ soft.assertThat(incompatible).isTrue();
+ var node = mgmt.lease();
+ var idGen = mgmt.buildIdGenerator(node);
+ soft.assertThat(idGen)
+ .isInstanceOf(SnowflakeIdGenerator.class)
+ .asInstanceOf(type(SnowflakeIdGenerator.class))
+ .extracting(
+ SnowflakeIdGenerator::timestampBits,
+ SnowflakeIdGenerator::nodeIdBits,
+ SnowflakeIdGenerator::sequenceBits)
+ .containsExactly(
+ SnowflakeIdGenerator.DEFAULT_TIMESTAMP_BITS,
+ SnowflakeIdGenerator.DEFAULT_NODE_ID_BITS,
+ SnowflakeIdGenerator.DEFAULT_SEQUENCE_BITS);
+ }
+ }
+
+ @Test
+ public void simple() {
+ var config =
+ NodeManagementConfig.BuildableNodeManagementConfig.builder()
+ .idGeneratorSpec(
+ ImmutableIdGeneratorSpec.builder()
+ .type("snowflake")
+ .params(Map.of("offset", "2024-11-21T12:44:51.134Z"))
+ .build())
+ .build();
+ try (var mgmt = new NodeManagementImpl(config, clock, new
MockNodeStoreFactory(), scheduler)) {
+ mgmt.init();
+
+ soft.assertThat(mgmt.maxNumberOfNodes()).isEqualTo(config.numNodes());
+ var lease = mgmt.lease();
+ soft.assertThat(lease).isNotNull();
+ soft.assertThat(lease.nodeIdIfValid()).isNotEqualTo(-1);
+ }
+ }
+
+ @Test
+ public void allocateAll() {
+ NodeManagementImpl m;
+ var config =
+ NodeManagementConfig.BuildableNodeManagementConfig.builder()
+ .idGeneratorSpec(
+ ImmutableIdGeneratorSpec.builder()
+ .type("snowflake")
+ .params(Map.of("offset", "2024-11-21T12:44:51.134Z"))
+ .build())
+ .build();
+ try (var mutableClock = new MutableMonotonicClock();
+ var mgmt =
+ new NodeManagementImpl(config, mutableClock, new
MockNodeStoreFactory(), scheduler)) {
+ mgmt.init();
+
+ var numNodeIds = 1 << SnowflakeIdGenerator.DEFAULT_NODE_ID_BITS;
+ var leases = new ArrayList<NodeLease>();
+ for (int i = 0; i < numNodeIds; i++) {
+ soft.assertThatCode(() -> leases.add(mgmt.lease()))
+ .describedAs("n = %d", i)
+ .doesNotThrowAnyException();
+ }
+ soft.assertThatIllegalStateException()
+ .isThrownBy(mgmt::lease)
+ .withMessage("Could not lease any node ID");
+
+ soft.assertThat(leases).hasSize(numNodeIds);
+
+ for (var lease : leases) {
+ soft.assertThat(lease.nodeIdIfValid()).isNotEqualTo(-1);
+ soft.assertThat(
+ requireNonNull(lease.node())
+ .valid(lease.node().expirationTimestamp().toEpochMilli()))
+ .isFalse();
+ }
+
+ // Release all leases
+
+ for (var node : leases) {
+ node.release();
+ soft.assertThat(node.nodeIdIfValid()).isEqualTo(-1);
+ }
+
+ leases.clear();
+
+ // Lease fails, because the clock is not 'after' the
'expirationTimestamp'
+
+ soft.assertThatIllegalStateException()
+ .isThrownBy(mgmt::lease)
+ .withMessage("Could not lease any node ID");
+
+ // Advance clock
+
+ mutableClock.advanceBoth(Duration.ofSeconds(1));
+
+ // Repeat allocation of all nodes
+
+ for (int i = 0; i < numNodeIds; i++) {
+ soft.assertThatCode(() -> leases.add(mgmt.lease()))
+ .describedAs("n = %d", i)
+ .doesNotThrowAnyException();
+ }
+ soft.assertThatIllegalStateException()
+ .isThrownBy(mgmt::lease)
+ .withMessage("Could not lease any node ID");
+
+ soft.assertThat(leases).hasSize(numNodeIds);
+
+ m = mgmt;
+ }
+
+ soft.assertThatIllegalStateException()
+ .isThrownBy(m::lease)
+ .withMessage("NodeManagement instance has been closed");
+ }
+}
diff --git a/persistence/nosql/nodes/impl/src/test/resources/logback-test.xml
b/persistence/nosql/nodes/impl/src/test/resources/logback-test.xml
new file mode 100644
index 000000000..4a4d9a629
--- /dev/null
+++ b/persistence/nosql/nodes/impl/src/test/resources/logback-test.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<configuration debug="false">
+ <contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator"/>
+ <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%date{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern>
+ </encoder>
+ </appender>
+ <root level="${test.log.level:-INFO}">
+ <appender-ref ref="console"/>
+ </root>
+</configuration>
diff --git
a/persistence/nosql/nodes/impl/src/testFixtures/java/org/apache/polaris/nodeids/impl/MockNodeStore.java
b/persistence/nosql/nodes/impl/src/testFixtures/java/org/apache/polaris/nodeids/impl/MockNodeStore.java
new file mode 100644
index 000000000..912069276
--- /dev/null
+++
b/persistence/nosql/nodes/impl/src/testFixtures/java/org/apache/polaris/nodeids/impl/MockNodeStore.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.nodeids.impl;
+
+import jakarta.annotation.Nonnull;
+import jakarta.annotation.Nullable;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.polaris.nodeids.spi.NodeState;
+import org.apache.polaris.nodeids.spi.NodeStore;
+
+public class MockNodeStore implements NodeStore {
+ private final Map<Integer, NodeState> nodeStates = new ConcurrentHashMap<>();
+
+ @Nullable
+ @Override
+ public NodeState persist(
+ int nodeId, Optional<NodeState> expectedNodeState, @Nonnull NodeState
newState) {
+ if (nodeId >= 0 && expectedNodeState.isPresent()) {
+ var result =
+ nodeStates.computeIfPresent(
+ nodeId,
+ (key, existing) -> {
+ if (expectedNodeState.get().equals(existing)) {
+ return newState;
+ }
+ return existing;
+ });
+ if (result == newState) {
+ return newState;
+ }
+ } else {
+ if (nodeStates.putIfAbsent(nodeId, newState) == null) {
+ return newState;
+ }
+ }
+ return null;
+ }
+
+ @Nonnull
+ @Override
+ public NodeState[] fetchMany(@Nonnull int... nodeIds) {
+ var r = new NodeState[nodeIds.length];
+ for (int i = 0; i < nodeIds.length; i++) {
+ var id = nodeIds[i];
+ if (id >= 0) {
+ r[i] = nodeStates.get(id);
+ }
+ }
+ return r;
+ }
+
+ @Override
+ public Optional<NodeState> fetch(int nodeId) {
+ return Optional.ofNullable(nodeStates.get(nodeId));
+ }
+}
diff --git
a/persistence/nosql/nodes/impl/src/testFixtures/java/org/apache/polaris/nodeids/impl/MockNodeStoreFactory.java
b/persistence/nosql/nodes/impl/src/testFixtures/java/org/apache/polaris/nodeids/impl/MockNodeStoreFactory.java
new file mode 100644
index 000000000..48ce35094
--- /dev/null
+++
b/persistence/nosql/nodes/impl/src/testFixtures/java/org/apache/polaris/nodeids/impl/MockNodeStoreFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.nodeids.impl;
+
+import jakarta.annotation.Nonnull;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.polaris.ids.api.IdGenerator;
+import org.apache.polaris.nodeids.spi.NodeManagementState;
+import org.apache.polaris.nodeids.spi.NodeStore;
+import org.apache.polaris.nodeids.spi.NodeStoreFactory;
+
+public class MockNodeStoreFactory implements NodeStoreFactory {
+ private final AtomicReference<NodeManagementState> nodeState = new
AtomicReference<>();
+ private final NodeStore nodeStore;
+
+ public MockNodeStoreFactory() {
+ this(new MockNodeStore());
+ }
+
+ public MockNodeStoreFactory(NodeStore nodeStore) {
+ this.nodeStore = nodeStore;
+ }
+
+ @Override
+ public boolean storeManagementState(@Nonnull NodeManagementState state) {
+ return nodeState.compareAndSet(null, state);
+ }
+
+ @Override
+ public Optional<NodeManagementState> fetchManagementState() {
+ return Optional.ofNullable(nodeState.get());
+ }
+
+ @Nonnull
+ @Override
+ public NodeStore createNodeStore(@Nonnull IdGenerator idGenerator) {
+ return nodeStore;
+ }
+}
diff --git a/persistence/nosql/nodes/spi/build.gradle.kts
b/persistence/nosql/nodes/spi/build.gradle.kts
new file mode 100644
index 000000000..6a62d205a
--- /dev/null
+++ b/persistence/nosql/nodes/spi/build.gradle.kts
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+plugins {
+ id("org.kordamp.gradle.jandex")
+ id("polaris-server")
+}
+
+description = "Polaris nodes SPI"
+
+dependencies {
+ implementation(project(":polaris-idgen-api"))
+ implementation(project(":polaris-nodes-api"))
+
+ compileOnly(project(":polaris-immutables"))
+ annotationProcessor(project(":polaris-immutables", configuration =
"processor"))
+
+ implementation(platform(libs.jackson.bom))
+ implementation("com.fasterxml.jackson.core:jackson-databind")
+
+ compileOnly(libs.jakarta.annotation.api)
+ compileOnly(libs.jakarta.validation.api)
+ compileOnly(libs.jakarta.inject.api)
+ compileOnly(libs.jakarta.enterprise.cdi.api)
+}
diff --git
a/persistence/nosql/nodes/spi/src/main/java/org/apache/polaris/nodeids/spi/NodeManagementState.java
b/persistence/nosql/nodes/spi/src/main/java/org/apache/polaris/nodeids/spi/NodeManagementState.java
new file mode 100644
index 000000000..558b1e1b6
--- /dev/null
+++
b/persistence/nosql/nodes/spi/src/main/java/org/apache/polaris/nodeids/spi/NodeManagementState.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.nodeids.spi;
+
+import java.util.Optional;
+import org.apache.polaris.ids.api.IdGeneratorSpec;
+import org.apache.polaris.immutables.PolarisImmutable;
+
+public interface NodeManagementState {
+ Optional<IdGeneratorSpec> idGeneratorSpec();
+
+ @PolarisImmutable
+ interface BuildableNodeManagementState extends NodeManagementState {}
+}
diff --git
a/persistence/nosql/nodes/spi/src/main/java/org/apache/polaris/nodeids/spi/NodeState.java
b/persistence/nosql/nodes/spi/src/main/java/org/apache/polaris/nodeids/spi/NodeState.java
new file mode 100644
index 000000000..a461c39fc
--- /dev/null
+++
b/persistence/nosql/nodes/spi/src/main/java/org/apache/polaris/nodeids/spi/NodeState.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.nodeids.spi;
+
+import java.time.Instant;
+import org.apache.polaris.immutables.PolarisImmutable;
+
+@PolarisImmutable
+public interface NodeState {
+ Instant leaseTimestamp();
+
+ /** Timestamp since which this node's lease is no longer valid. */
+ Instant expirationTimestamp();
+}
diff --git
a/persistence/nosql/nodes/spi/src/main/java/org/apache/polaris/nodeids/spi/NodeStore.java
b/persistence/nosql/nodes/spi/src/main/java/org/apache/polaris/nodeids/spi/NodeStore.java
new file mode 100644
index 000000000..8950be1b7
--- /dev/null
+++
b/persistence/nosql/nodes/spi/src/main/java/org/apache/polaris/nodeids/spi/NodeStore.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.nodeids.spi;
+
+import jakarta.annotation.Nonnull;
+import jakarta.annotation.Nullable;
+import java.util.Optional;
+
+/** Abstraction of nodes persistence for the node management implementation. */
+public interface NodeStore {
+ Optional<NodeState> fetch(int nodeId);
+
+ @Nonnull
+ NodeState[] fetchMany(@Nonnull int... nodeIds);
+
+ @Nullable
+ NodeState persist(int nodeId, Optional<NodeState> expectedNodeState,
@Nonnull NodeState newState);
+}
diff --git
a/persistence/nosql/nodes/spi/src/main/java/org/apache/polaris/nodeids/spi/NodeStoreFactory.java
b/persistence/nosql/nodes/spi/src/main/java/org/apache/polaris/nodeids/spi/NodeStoreFactory.java
new file mode 100644
index 000000000..c59db0ed0
--- /dev/null
+++
b/persistence/nosql/nodes/spi/src/main/java/org/apache/polaris/nodeids/spi/NodeStoreFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.nodeids.spi;
+
+import jakarta.annotation.Nonnull;
+import java.util.Optional;
+import org.apache.polaris.ids.api.IdGenerator;
+
+public interface NodeStoreFactory {
+ @Nonnull
+ NodeStore createNodeStore(@Nonnull IdGenerator idGenerator);
+
+ Optional<NodeManagementState> fetchManagementState();
+
+ boolean storeManagementState(@Nonnull NodeManagementState state);
+}