slfan1989 commented on code in PR #14238: URL: https://github.com/apache/iceberg/pull/14238#discussion_r2450756730
########## flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/EtcdLockFactory.java: ########## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.api; + +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.Client; +import io.etcd.jetcd.KV; +import io.etcd.jetcd.kv.GetResponse; +import io.etcd.jetcd.lock.LockResponse; +import io.etcd.jetcd.lock.UnlockResponse; +import io.etcd.jetcd.options.GetOption; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Etcd backed implementation of the {@link EtcdLockFactory}. */ +public class EtcdLockFactory implements TriggerLockFactory { + + private static final Logger LOG = LoggerFactory.getLogger(EtcdLockFactory.class); + private static final String LOCK_BASE_PATH = "/iceberg/flink/maintenance/locks/"; + + private final String etcdEndpoint; + private final String lockId; + private final Duration connectionTimeoutMs; + private final Duration keepAliveMs; + private final Duration keepAliveTimeoutMs; + private final int maxRetries; + private transient Client client; + private volatile boolean isOpen; + + /** + * Constructs a new EtcdLockFactory instance. + * + * @param etcdEndpoint The Etcd server endpoint (e.g., "http://127.0.0.1:2379") — required, no + * default + * @param lockId A unique identifier for the lock — required, no default + * @param connectionTimeoutMs Connection timeout in milliseconds (must be >= 0). Default (if not + * set elsewhere): 5000 ms + * @param keepAliveMs Interval in milliseconds for gRPC keepalive pings. Default: 30000 ms + * @param keepAliveTimeoutMs Timeout in milliseconds for gRPC keepalive before connection is + * considered dead. Default: 10000 ms + * @param maxRetries Maximum number of retries in case of connection or request failures. Default: + * 2 + * @throws NullPointerException if {@code etcdEndpoint} or {@code lockId} is null + * @throws IllegalArgumentException if {@code connectionTimeoutMs} is negative + */ + public EtcdLockFactory( + String etcdEndpoint, + String lockId, + int connectionTimeoutMs, + int keepAliveMs, + int keepAliveTimeoutMs, + int maxRetries) { + Preconditions.checkNotNull(etcdEndpoint, "Etcd endpoint cannot be null"); + Preconditions.checkNotNull(lockId, "Lock ID cannot be null"); + // connectionTimeout must be strictly positive ( > 0 ) + Preconditions.checkArgument( + connectionTimeoutMs > 0, + "Connection timeout must be greater than 0, got: %s", + connectionTimeoutMs); + + // keepAlive must be strictly positive + Preconditions.checkArgument( + keepAliveMs > 0, "KeepAlive interval must be greater than 0, got: %s", keepAliveMs); + + // keepAliveTimeout must be strictly positive + Preconditions.checkArgument( + keepAliveTimeoutMs > 0, + "KeepAlive timeout must be greater than 0, got: %s", + keepAliveTimeoutMs); + + // keepAliveTimeout should not exceed keepAlive interval + Preconditions.checkArgument( + keepAliveTimeoutMs < keepAliveMs, + "KeepAlive timeout (%s ms) must be less than keepAlive interval (%s ms)", + keepAliveTimeoutMs, + keepAliveMs); + + // maxRetries must be non-negative + Preconditions.checkArgument(maxRetries >= 0, "Max retries must be >= 0, got: %s", maxRetries); + + this.etcdEndpoint = etcdEndpoint; + this.lockId = lockId; + this.connectionTimeoutMs = Duration.ofMillis(connectionTimeoutMs); + this.keepAliveMs = Duration.ofMillis(keepAliveMs); + this.keepAliveTimeoutMs = Duration.ofMillis(keepAliveTimeoutMs); + this.maxRetries = maxRetries; + } + + @Override + public void open() { + if (isOpen) { + LOG.debug("EtcdLockFactory already opened for lockId: {}.", lockId); + return; + } + + try { + this.client = + Client.builder() + .endpoints(etcdEndpoint) + .connectTimeout(connectionTimeoutMs) + .retryMaxAttempts(maxRetries) + .keepaliveTime(keepAliveMs) + .keepaliveTimeout(keepAliveTimeoutMs) + .build(); + isOpen = true; + LOG.info("EtcdLockFactory initialized for lockId: {}.", lockId); + } catch (Exception e) { + closeQuietly(); + throw new RuntimeException("Failed to initialize EtcdLockFactory", e); + } + } + + @Override + public Lock createLock() { + return new EtcdLockFactory.EtcdLock(getTaskSharePath(), client); + } + + @Override + public Lock createRecoveryLock() { + return new EtcdLockFactory.EtcdLock(getRecoverySharedPath(), client); + } + + @Override + public void close() throws IOException { + try { + if (client != null) { + client.close(); + } + } finally { + isOpen = false; + } + } + + private void closeQuietly() { + try { + close(); + } catch (Exception e) { + LOG.warn("Failed to close EtcdLockFactory for lockId: {}", lockId, e); + } + } + + private String getTaskSharePath() { + return LOCK_BASE_PATH + lockId + "/task"; + } + + private String getRecoverySharedPath() { + return LOCK_BASE_PATH + lockId + "/recovery"; + } + + /** Etcd lock implementation */ + private static class EtcdLock implements Lock { + private final io.etcd.jetcd.Lock lockClient; + private final KV kvClient; + private final ByteSequence lockKey; + private final String lockPath; + private final AtomicReference<ByteSequence> lockKeyRef; + + private EtcdLock(String lockPath, Client client) { + this.lockClient = client.getLockClient(); + this.kvClient = client.getKVClient(); + this.lockPath = lockPath; + this.lockKey = ByteSequence.from(lockPath, StandardCharsets.UTF_8); + this.lockKeyRef = new AtomicReference<>(null); + } + + @Override + public boolean tryLock() { + try { + if (isHeld()) { + LOG.warn("Lock is already held for path: {}", lockPath); + return false; + } + + CompletableFuture<LockResponse> lockFuture = lockClient.lock(lockKey, 0L); Review Comment: Thank you for raising this valuable question! Meaning of `0L`: In the jetcd client, the second parameter of lockClient.lock is the lease ID. Passing `0L` means no lease is used, and the lock remains valid until explicitly released. Regarding the choice of `0L` in lockClient.lock(lockKey, `0L`), Etcd's lease mechanism is indeed very flexible, allowing locks to have an automatic expiration time by binding a lease ID, thus effectively preventing stale lock issues. However, fully implementing the lease mechanism requires additional logic, such as `lease creation`, `management`, `renewal`, and `cleanup` in exceptional cases, which would increase code complexity. In the current implementation, we chose 0L (indicating no lease binding) to simplify the lock logic. The `unlock` method explicitly releases the lock. To further enhance robustness, we have introduced timeout mechanisms in the `tryLock`, `isHeld`, and `unlock` methods (e.g., future.orTimeout) to prevent lock operations from blocking. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
