Copilot commented on code in PR #2197:
URL: https://github.com/apache/phoenix/pull/2197#discussion_r2155677761


##########
phoenix-core-server/src/main/java/org/apache/phoenix/replication/StandbyLogGroupWriter.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.phoenix.replication.log.LogFileWriter;
+import org.apache.phoenix.replication.log.LogFileWriterContext;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Synchronous replication implementation of ReplicationLogGroupWriter.
+ * <p>
+ * This class implements synchronous replication to a standby cluster's HDFS. 
It writes replication
+ * logs directly to the standby cluster in synchronous mode, providing 
immediate consistency for
+ * failover scenarios.
+ */
+public class StandbyLogGroupWriter extends ReplicationLogGroupWriter {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(StandbyLogGroupWriter.class);
+
+    private FileSystem standbyFs;
+    private URI standbyUrl;
+    protected int numShards;
+    protected final ConcurrentHashMap<Path, Object> shardMap = new 
ConcurrentHashMap<>();
+
+    /**
+     * Constructor for StandbyLogGroupWriter.
+     */
+    public StandbyLogGroupWriter(Configuration conf, ServerName serverName, 
String haGroupId) {
+        super(conf, serverName, haGroupId);
+        this.numShards = 
conf.getInt(ReplicationLogGroup.REPLICATION_NUM_SHARDS_KEY,
+            ReplicationLogGroup.DEFAULT_REPLICATION_NUM_SHARDS);
+        LOG.debug("Created StandbyLogGroupWriter for HA Group: {}", haGroupId);
+    }
+
+    @Override
+    protected void initializeFileSystems() throws IOException {
+        if (numShards > ReplicationLogGroup.MAX_REPLICATION_NUM_SHARDS) {
+            throw new 
IllegalArgumentException(ReplicationLogGroup.REPLICATION_NUM_SHARDS_KEY
+                + " is " + numShards + ", but the limit is "
+                + ReplicationLogGroup.MAX_REPLICATION_NUM_SHARDS);
+        }
+        String standbyUrlString = 
conf.get(ReplicationLogGroup.REPLICATION_STANDBY_HDFS_URL_KEY);
+        if (standbyUrlString == null || standbyUrlString.trim().isEmpty()) {
+            throw new IOException("Standby HDFS URL not configured: "
+                + ReplicationLogGroup.REPLICATION_STANDBY_HDFS_URL_KEY);
+        }
+        try {
+            standbyUrl = new URI(standbyUrlString);
+            standbyFs = getFileSystem(standbyUrl);
+            LOG.info("Initialized standby filesystem: {}", standbyUrl);
+        } catch (URISyntaxException e) {
+            throw new IOException("Invalid standby HDFS URL: " + 
standbyUrlString, e);
+        }
+    }
+
+    /**
+     * Creates a new log file path in a sharded directory structure based on 
server name and
+     * timestamp. The resulting path structure is
+     * <pre>
+     * [url]/[haGroupId]/[shard]/[timestamp]-[servername].plog
+     * <pre>
+     * 
+     */
+    protected Path makeWriterPath(FileSystem fs, URI url) throws IOException {
+        Path haGroupPath = new Path(url.getPath(), haGroupId);
+        long timestamp = EnvironmentEdgeManager.currentTimeMillis();
+        // To have all logs for a given regionserver appear in the same shard, 
hash only the
+        // serverName. However we expect some regionservers will have 
significantly more load than
+        // others so we instead distribute the logs over all of the shards 
randomly for a more even
+        // overall distribution by also hashing the timestamp.
+        int shard = (serverName.hashCode() ^ Long.hashCode(timestamp)) % 
numShards;

Review Comment:
   The shard calculation may yield a negative value due to the modulo operation 
on a potentially negative result. Consider using Math.abs or a positive modulo 
approach to ensure the shard index is non-negative.
   ```suggestion
           int shard = Math.floorMod(serverName.hashCode() ^ 
Long.hashCode(timestamp), numShards);
   ```



##########
phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java:
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.phoenix.replication.metrics.MetricsReplicationLogGroupSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ReplicationLogGroup manages a group of replication logs for a given HA 
Group.
+ * <p>
+ * This class provides an API for replication operations and delegates to 
either synchronous
+ * replication (StandbyLogGroupWriter) or store-and-forward replication
+ * (StoreAndForwardLogGroupWriter) based on the current replication mode.
+ * <p>
+ * Key features:
+ * <ul>
+ *   <li>Manages multiple replication logs for an HA Group</li>
+ *   <li>Provides append() and sync() API for higher layers</li>
+ *   <li>Delegates to appropriate writer implementation based on replication 
mode</li>
+ *   <li>Thread-safe operations</li>
+ * </ul>
+ * <p>
+ * The class delegates actual replication work to implementations of 
ReplicationLogGroupWriter:
+ * <ul>
+ *   <li>StandbyLogGroupWriter: Synchronous replication to standby cluster</li>
+ *   <li>StoreAndForwardLogGroupWriter: Local storage with forwarding when 
available</li>
+ * </ul>
+ */
+public class ReplicationLogGroup {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogGroup.class);
+
+    // Configuration constants from original ReplicationLog
+    public static final String REPLICATION_STANDBY_HDFS_URL_KEY =
+        "phoenix.replication.log.standby.hdfs.url";
+    public static final String REPLICATION_FALLBACK_HDFS_URL_KEY =
+        "phoenix.replication.log.fallback.hdfs.url";
+    public static final String REPLICATION_NUM_SHARDS_KEY = 
"phoenix.replication.log.shards";
+    public static final int DEFAULT_REPLICATION_NUM_SHARDS = 1000;
+    public static final int MAX_REPLICATION_NUM_SHARDS = 100000;
+    public static final String REPLICATION_LOG_ROTATION_TIME_MS_KEY =
+        "phoenix.replication.log.rotation.time.ms";
+    public static final long DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS = 60 * 
1000L;
+    public static final String REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY =
+        "phoenix.replication.log.rotation.size.bytes";
+    public static final long DEFAULT_REPLICATION_LOG_ROTATION_SIZE_BYTES = 256 
* 1024 * 1024L;
+    public static final String REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE_KEY =
+        "phoenix.replication.log.rotation.size.percentage";
+    public static final double 
DEFAULT_REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE = 0.95;
+    public static final String REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY =
+        "phoenix.replication.log.compression";
+    public static final String DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM = 
"NONE";
+    public static final String REPLICATION_LOG_RINGBUFFER_SIZE_KEY =
+        "phoenix.replication.log.ringbuffer.size";
+    public static final int DEFAULT_REPLICATION_LOG_RINGBUFFER_SIZE = 1024 * 
32;
+    public static final String REPLICATION_LOG_SYNC_TIMEOUT_KEY =
+        "phoenix.replication.log.sync.timeout.ms";
+    public static final long DEFAULT_REPLICATION_LOG_SYNC_TIMEOUT = 1000 * 30;
+    public static final String REPLICATION_LOG_SYNC_RETRIES_KEY =
+        "phoenix.replication.log.sync.retries";
+    public static final int DEFAULT_REPLICATION_LOG_SYNC_RETRIES = 5;
+    public static final String REPLICATION_LOG_ROTATION_RETRIES_KEY =
+        "phoenix.replication.log.rotation.retries";
+    public static final int DEFAULT_REPLICATION_LOG_ROTATION_RETRIES = 5;
+    public static final String REPLICATION_LOG_RETRY_DELAY_MS_KEY =
+        "phoenix.replication.log.retry.delay.ms";
+    public static final long DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS = 100L;
+
+    public static final String SHARD_DIR_FORMAT = "%05d";
+    public static final String FILE_NAME_FORMAT = "%d-%s.plog";
+
+    /** Cache of ReplicationLogGroup instances by HA Group ID */
+    private static final ConcurrentHashMap<String, ReplicationLogGroup> 
instances =
+        new ConcurrentHashMap<>();
+
+    private final Configuration conf;
+    private final ServerName serverName;
+    private final String haGroupId;
+    private volatile ReplicationLogGroupWriter writer;
+    private volatile boolean closed = false;
+
+    /**
+     * The current replication mode. Always SYNC for now.
+     * <p>TODO: Implement mode transitions to STORE_AND_FORWARD when standby 
becomes unavailable.
+     * <p>TODO: Implement mode transitions to SYNC_AND_FORWARD when draining 
queue.
+     */
+    protected volatile ReplicationMode currentMode = ReplicationMode.SYNC;
+
+    /**
+     * Tracks the current replication mode of the ReplicationLog.
+     * <p>
+     * The replication mode determines how mutations are handled:
+     * <ul>
+     *   <li>SYNC: Normal operation where mutations are written directly to 
the standby cluster's
+     *   HDFS.
+     *   This is the default and primary mode of operation.</li>
+     *   <li>STORE_AND_FORWARD: Fallback mode when the standby cluster's HDFS 
is unavailable.
+     *   Mutations are stored locally and will be forwarded when connectivity 
is restored.</li>
+     *   <li>SYNC_AND_FORWARD: Transitional mode where new mutations are 
written directly to the
+     *   standby cluster while concurrently draining the local queue of 
previously stored
+     *   mutations.</li>
+     * </ul>
+     * <p>
+     * Mode transitions occur automatically based on the availability of the 
standby cluster's HDFS
+     * and the state of the local mutation queue.
+     */
+    protected enum ReplicationMode {
+        /**
+         * Normal operation where mutations are written directly to the 
standby cluster's HDFS.
+         * This is the default and primary mode of operation.
+         */
+        SYNC,
+
+        /**
+         * Fallback mode when the standby cluster's HDFS is unavailable. 
Mutations are stored
+         * locally and will be forwarded when connectivity is restored.
+         */
+        STORE_AND_FORWARD,
+
+        /**
+         * Transitional mode where new mutations are written directly to the 
standby cluster
+         * while concurrently draining the local queue of previously stored 
mutations. This mode
+         * is entered when connectivity to the standby cluster is restored 
while there are still
+         * mutations in the local queue.
+         */
+        SYNC_AND_FORWARD;
+    }
+
+    /**
+     * Get or create a ReplicationLogGroup instance for the given HA Group.
+     *
+     * @param conf Configuration object
+     * @param serverName The server name
+     * @param haGroupId The HA Group identifier
+     * @return ReplicationLogGroup instance
+     * @throws RuntimeException if initialization fails
+     */
+    public static ReplicationLogGroup get(Configuration conf, ServerName 
serverName,
+            String haGroupId) {
+        return instances.computeIfAbsent(haGroupId, k -> {
+            try {
+                ReplicationLogGroup group = new ReplicationLogGroup(conf, 
serverName, haGroupId);
+                group.init();
+                return group;
+            } catch (IOException e) {
+                LOG.error("Failed to create ReplicationLogGroup for HA Group: 
{}", haGroupId, e);
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
+    /**
+     * Protected constructor for ReplicationLogGroup.
+     *
+     * @param conf Configuration object
+     * @param serverName The server name
+     * @param haGroupId The HA Group identifier
+     */
+    protected ReplicationLogGroup(Configuration conf, ServerName serverName, 
String haGroupId) {
+        this.conf = conf;
+        this.serverName = serverName;
+        this.haGroupId = haGroupId;
+    }
+
+    /**
+     * Initialize the ReplicationLogGroup by creating the appropriate writer 
implementation.
+     *
+     * @throws IOException if initialization fails
+     */
+    protected void init() throws IOException {
+        // Start with synchronous replication (StandbyLogGroupWriter). Later 
we can add logic to
+        // determine the appropriate writer based on configuration or HA Group 
state.
+        writer = new StandbyLogGroupWriter(conf, serverName, haGroupId);
+        writer.init();
+        LOG.info("Initialized ReplicationLogGroup for HA Group: {}", 
haGroupId);
+    }
+
+    /**
+     * Get the current metrics source for monitoring operations.
+     *
+     * @return MetricsReplicationLogSource instance
+     */
+    public MetricsReplicationLogGroupSource getMetrics() {
+        return writer != null ? writer.getMetrics() : null;
+    }
+
+    /**
+     * Get the HA Group ID managed by this instance.
+     *
+     * @return HA Group ID
+     */
+    public String getHaGroupId() {
+        return haGroupId;
+    }
+
+    /**
+     * Append a mutation to the replication log group. This operation is 
normally non-blocking
+     * unless the ring buffer is full.
+     *
+     * @param tableName The name of the HBase table the mutation applies to
+     * @param commitId The commit identifier (e.g., SCN) associated with the 
mutation
+     * @param mutation The HBase Mutation (Put or Delete) to be logged
+     * @throws IOException If the operation fails
+     */
+    public void append(String tableName, long commitId, Mutation mutation) 
throws IOException {
+        if (closed) {
+            throw new IOException("Closed");
+        }
+        writer.append(tableName, commitId, mutation);
+    }
+
+    /**
+     * Ensure all previously appended records are durably persisted. This 
method blocks until the
+     * sync operation completes or fails.
+     *
+     * @throws IOException If the sync operation fails
+     */
+    public void sync() throws IOException {
+        if (closed) {
+            throw new IOException("Closed");
+        }
+        writer.sync();
+    }
+
+
+    /**
+     * Check if this ReplicationLogGroup is closed.
+     *
+     * @return true if closed, false otherwise
+     */
+    public boolean isClosed() {
+        return closed;
+    }
+
+    /**
+     * Close the ReplicationLogGroup and all associated resources.
+     * This method is thread-safe and can be called multiple times.
+     */
+    public void close() {
+        synchronized (this) {
+            if (closed) {
+                return;
+            }
+            closed = true;
+            closeWriter(writer);
+            // Remove from instances cache
+            instances.remove(haGroupId);
+            LOG.info("Closed ReplicationLogGroup for HA Group: {}", haGroupId);
+        }
+    }
+
+    /**
+     * Close the given writer.
+     *
+     * @param writer The writer to close
+     */
+    protected void closeWriter(ReplicationLogGroupWriter writer) {
+        if (writer != null) {
+            writer.close();
+        }
+    }
+
+    /**
+     * Switch the writer implementation (e.g., from synchronous to 
store-and-forward). This method
+     * is thread-safe and ensures proper cleanup of the old writer.
+     *
+     * @param writer The new writer implementation
+     * @throws IOException if the switch fails
+     */
+    protected void switchWriter(ReplicationLogGroupWriter writer) throws 
IOException {
+        synchronized (this) {
+            if (closed) {
+                throw new IOException("Closed");
+            }
+
+            LOG.info("Switching writer for HA Group {} from {} to {}",
+                haGroupId, writer.getClass().getSimpleName(), 
writer.getClass().getSimpleName());
+
+            ReplicationLogGroupWriter oldWriter = this.writer;

Review Comment:
   The log message in switchWriter() is using the same writer class for both 
the old and new writer. It should display the class name of the old writer 
(e.g., oldWriter.getClass().getSimpleName()) for clarity.
   ```suggestion
               ReplicationLogGroupWriter oldWriter = this.writer;
               LOG.info("Switching writer for HA Group {} from {} to {}",
                   haGroupId, oldWriter != null ? 
oldWriter.getClass().getSimpleName() : "null", 
writer.getClass().getSimpleName());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to