tkhurana commented on code in PR #2379:
URL: https://github.com/apache/phoenix/pull/2379#discussion_r2968555490


##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.phoenix.mapreduce.PhoenixSyncTableOutputRow.Status;
+import org.apache.phoenix.mapreduce.PhoenixSyncTableOutputRow.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Repository for managing the PHOENIX_SYNC_TABLE_CHECKPOINT table. This table 
stores checkpoint
+ * information for the PhoenixSyncTableTool, enabling: 1. Mapper Level 
checkpointing (skip completed
+ * mapper regions on restart) 2. Chunk level checkpointing (skip completed 
chunks)
+ */
+public class PhoenixSyncTableOutputRepository {
+
+  private static final Logger LOGGER =
+    LoggerFactory.getLogger(PhoenixSyncTableOutputRepository.class);
+  public static final String SYNC_TABLE_CHECKPOINT_TABLE_NAME = 
"PHOENIX_SYNC_TABLE_CHECKPOINT";
+  private static final int OUTPUT_TABLE_TTL_SECONDS = 90 * 24 * 60 * 60; // 90 
days
+  private final Connection connection;
+  private static final String UPSERT_CHECKPOINT_SQL = "UPSERT INTO "
+    + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (TABLE_NAME, TARGET_CLUSTER, TYPE, 
FROM_TIME, TO_TIME,"
+    + " START_ROW_KEY, END_ROW_KEY, IS_DRY_RUN, EXECUTION_START_TIME, 
EXECUTION_END_TIME,"
+    + " STATUS, COUNTERS) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+  /**
+   * Creates a repository for managing sync table checkpoint operations. Note: 
The connection is
+   * stored as-is and shared across operations. The caller retains ownership 
and is responsible for
+   * connection lifecycle.
+   * @param connection Phoenix connection (must remain open for repository 
lifetime)
+   */
+  public PhoenixSyncTableOutputRepository(Connection connection) {
+    this.connection = connection;
+  }
+
+  public void createSyncCheckpointTableIfNotExists() throws SQLException {
+    String ddl = "CREATE TABLE IF NOT EXISTS " + 
SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (\n"
+      + "    TABLE_NAME VARCHAR NOT NULL,\n" + "    TARGET_CLUSTER VARCHAR NOT 
NULL,\n"
+      + "    TYPE VARCHAR(20) NOT NULL,\n" + "    FROM_TIME BIGINT NOT NULL,\n"
+      + "    TO_TIME BIGINT NOT NULL,\n" + "    START_ROW_KEY 
VARBINARY_ENCODED,\n"
+      + "    END_ROW_KEY VARBINARY_ENCODED,\n" + "    IS_DRY_RUN BOOLEAN, \n"
+      + "    EXECUTION_START_TIME TIMESTAMP,\n" + "    EXECUTION_END_TIME 
TIMESTAMP,\n"
+      + "    STATUS VARCHAR(20),\n" + "    COUNTERS VARCHAR(255), \n"
+      + "    CONSTRAINT PK PRIMARY KEY (\n" + "        TABLE_NAME,\n" + "      
  TARGET_CLUSTER,\n"
+      + "        TYPE ,\n" + "        FROM_TIME,\n" + "        TO_TIME,\n"
+      + "        START_ROW_KEY )" + ") TTL=" + OUTPUT_TABLE_TTL_SECONDS;
+
+    try (Statement stmt = connection.createStatement()) {
+      stmt.execute(ddl);
+      connection.commit();
+      LOGGER.info("Successfully created or verified existence of {} table",
+        SYNC_TABLE_CHECKPOINT_TABLE_NAME);
+    }
+  }
+
+  public void checkpointSyncTableResult(String tableName, String 
targetCluster, Type type,
+    Long fromTime, Long toTime, boolean isDryRun, byte[] startKey, byte[] 
endKey, Status status,
+    Timestamp executionStartTime, Timestamp executionEndTime, String counters) 
throws SQLException {
+
+    // Validate required parameters
+    if (tableName == null || tableName.isEmpty()) {
+      throw new IllegalArgumentException("TableName cannot be null or empty 
for checkpoint");
+    }
+    if (targetCluster == null || targetCluster.isEmpty()) {
+      throw new IllegalArgumentException("TargetCluster cannot be null or 
empty for checkpoint");
+    }
+    if (type == null) {
+      throw new IllegalArgumentException("Type cannot be null for checkpoint");
+    }
+    if (fromTime == null || toTime == null) {
+      throw new IllegalArgumentException("FromTime and ToTime cannot be null 
for checkpoint");
+    }
+
+    try (PreparedStatement ps = 
connection.prepareStatement(UPSERT_CHECKPOINT_SQL)) {
+      ps.setString(1, tableName);
+      ps.setString(2, targetCluster);
+      ps.setString(3, type.name());
+      ps.setLong(4, fromTime);
+      ps.setLong(5, toTime);
+      ps.setBytes(6, startKey);
+      ps.setBytes(7, endKey);
+      ps.setBoolean(8, isDryRun);
+      ps.setTimestamp(9, executionStartTime);
+      ps.setTimestamp(10, executionEndTime);
+      ps.setString(11, status != null ? status.name() : null);
+      ps.setString(12, counters);
+      ps.executeUpdate();
+      connection.commit();
+    }
+  }
+
+  /**
+   * Queries for completed mapper regions. Used by PhoenixSyncTableInputFormat 
to filter out
+   * already-processed regions.
+   * @param tableName     Source table name
+   * @param targetCluster Target cluster ZK quorum
+   * @param fromTime      Start timestamp (nullable)
+   * @param toTime        End timestamp (nullable)
+   * @return List of completed mapper regions
+   */
+  public List<PhoenixSyncTableOutputRow> getProcessedMapperRegions(String 
tableName,
+    String targetCluster, Long fromTime, Long toTime) throws SQLException {
+
+    String query = "SELECT START_ROW_KEY, END_ROW_KEY FROM " + 
SYNC_TABLE_CHECKPOINT_TABLE_NAME
+      + " WHERE TABLE_NAME = ?  AND TARGET_CLUSTER = ?"
+      + " AND TYPE = ? AND FROM_TIME = ? AND TO_TIME = ? AND STATUS IN ( ?, 
?)";

Review Comment:
   There are only 2 possible status so does it make sense to set them in the 
query ? If you don't then you are only querying pk columns without any filter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to