haridsv commented on code in PR #2379:
URL: https://github.com/apache/phoenix/pull/2379#discussion_r2989688144


##########
phoenix-core-client/src/main/java/org/apache/phoenix/util/SHA256DigestUtil.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.bouncycastle.crypto.digests.SHA256Digest;
+
+/**
+ * Utility class for SHA-256 digest state serialization and deserialization. 
We are not using jdk
+ * bundled SHA, since their digest can't be serialized/deserialized which is 
needed for
+ * PhoenixSyncTableTool for cross-region hash continuation.
+ */
+public class SHA256DigestUtil {
+
+  /**
+   * Maximum allowed size for encoded SHA-256 digest state. SHA-256 state is 
~96 bytes, we allow up

Review Comment:
   Can you point me to the documentation on the size being ~96 bytes?



##########
phoenix-core-client/src/main/java/org/apache/phoenix/util/SHA256DigestUtil.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.bouncycastle.crypto.digests.SHA256Digest;
+
+/**
+ * Utility class for SHA-256 digest state serialization and deserialization. 
We are not using jdk
+ * bundled SHA, since their digest can't be serialized/deserialized which is 
needed for
+ * PhoenixSyncTableTool for cross-region hash continuation.
+ */
+public class SHA256DigestUtil {
+
+  /**
+   * Maximum allowed size for encoded SHA-256 digest state. SHA-256 state is 
~96 bytes, we allow up
+   * to 128 bytes as buffer.
+   */
+  public static final int MAX_SHA256_DIGEST_STATE_SIZE = 128;
+
+  /**
+   * Encodes a SHA256Digest state to a byte array with length prefix for 
validation. Format: [4-byte
+   * integer length][encoded digest state bytes]
+   * @param digest The digest whose state should be encoded
+   * @return Byte array containing integer length prefix + encoded state
+   */
+  public static byte[] encodeDigestState(SHA256Digest digest) {
+    byte[] encoded = digest.getEncodedState();
+    ByteBuffer buffer = ByteBuffer.allocate(Bytes.SIZEOF_INT + encoded.length);
+    buffer.putInt(encoded.length);
+    buffer.put(encoded);
+    return buffer.array();
+  }

Review Comment:
   Since MAX_SHA256_DIGEST_STATE_SIZE is capped at 128 bytes , using a 4-byte 
integer and ByteBuffer for the length prefix is slightly over-engineered. We 
can optimize this by using a single byte for the length and Bytes.add() for 
concatenation. This would allow us to remove the ByteBuffer, 
ByteArrayInputStream, and DataInputStream dependencies in these utility methods.
   
   ```suggestion
   public static byte[] encodeDigestState(SHA256Digest digest) {
       byte[] encoded = digest.getEncodedState();
       // Use an unsigned byte as 128 > Byte.MAX_VALUe
       return Bytes.add(new byte[]{(byte) (encoded.length & 0xff)}, encoded);
   }
   ```



##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRow.java:
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.Objects;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Data model class representing required row in the 
PHOENIX_SYNC_TABLE_CHECKPOINT table
+ */
+public class PhoenixSyncTableOutputRow {
+
+  public enum Type {
+    CHUNK,
+    MAPPER_REGION
+  }
+
+  public enum Status {
+    VERIFIED,
+    MISMATCHED
+  }
+
+  private String tableName;
+  private String targetCluster;
+  private Type type;
+  private Long fromTime;
+  private Long toTime;
+  private Boolean isDryRun;
+  private byte[] startRowKey;
+  private byte[] endRowKey;
+  private Boolean isFirstRegion;

Review Comment:
   Is this being used?



##########
phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java:
##########
@@ -199,4 +199,23 @@ public static long getMaxLookbackInMillis(Configuration 
conf) {
 
   /** Exposed for testing */
   public static final String SCANNER_OPENED_TRACE_INFO = "Scanner opened on 
server";
+
+  /**
+   * PhoenixSyncTableTool scan attributes for server-side chunk formation and 
checksum
+   */
+  public static final String SYNC_TABLE_CHUNK_FORMATION = 
"_SyncTableChunkFormation";
+  public static final String SYNC_TABLE_CHUNK_SIZE_BYTES = 
"_SyncTableChunkSizeBytes";
+  public static final String SYNC_TABLE_CONTINUED_DIGEST_STATE = 
"_SyncTableContinuedDigestState";

Review Comment:
   Can you add JavaDoc on all 3 constants individually with a description of 
what they the attribute is and what type of value it would contain?



##########
phoenix-core-client/src/main/java/org/apache/phoenix/util/SHA256DigestUtil.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.bouncycastle.crypto.digests.SHA256Digest;
+
+/**
+ * Utility class for SHA-256 digest state serialization and deserialization. 
We are not using jdk
+ * bundled SHA, since their digest can't be serialized/deserialized which is 
needed for
+ * PhoenixSyncTableTool for cross-region hash continuation.
+ */
+public class SHA256DigestUtil {
+
+  /**
+   * Maximum allowed size for encoded SHA-256 digest state. SHA-256 state is 
~96 bytes, we allow up
+   * to 128 bytes as buffer.
+   */
+  public static final int MAX_SHA256_DIGEST_STATE_SIZE = 128;
+
+  /**
+   * Encodes a SHA256Digest state to a byte array with length prefix for 
validation. Format: [4-byte
+   * integer length][encoded digest state bytes]
+   * @param digest The digest whose state should be encoded
+   * @return Byte array containing integer length prefix + encoded state
+   */
+  public static byte[] encodeDigestState(SHA256Digest digest) {
+    byte[] encoded = digest.getEncodedState();
+    ByteBuffer buffer = ByteBuffer.allocate(Bytes.SIZEOF_INT + encoded.length);
+    buffer.putInt(encoded.length);
+    buffer.put(encoded);
+    return buffer.array();
+  }

Review Comment:
   BTW, can you tell me why we need to encode the length into it? You are using 
`PhoenixKeyValueUtil.newKeyValue` which is already encoding the length of the 
`byte[]` anyway.



##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableMapper.java:
##########
@@ -0,0 +1,723 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SHA256DigestUtil;
+import org.apache.phoenix.util.ScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Mapper that acts as a driver for validating table data between source and 
target clusters. The
+ * actual work of chunking and hashing is done server-side by the coprocessor. 
This mapper fetches
+ * chunk hashes from both clusters, compares them and write to checkpoint 
table.
+ */
+public class PhoenixSyncTableMapper
+  extends Mapper<NullWritable, DBInputFormat.NullDBWritable, NullWritable, 
NullWritable> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PhoenixSyncTableMapper.class);
+
+  public enum SyncCounters {
+    CHUNKS_VERIFIED,
+    CHUNKS_MISMATCHED,
+    SOURCE_ROWS_PROCESSED,
+    TARGET_ROWS_PROCESSED
+  }
+
+  private String tableName;
+  private String targetZkQuorum;
+  private Long fromTime;
+  private Long toTime;
+  private boolean isDryRun;
+  private long chunkSizeBytes;
+  private Configuration conf;
+  private Connection sourceConnection;
+  private Connection targetConnection;
+  private Connection globalConnection;
+  private PTable pTable;
+  private byte[] physicalTableName;
+  private byte[] mapperRegionStart;
+  private byte[] mapperRegionEnd;
+  private PhoenixSyncTableOutputRepository syncTableOutputRepository;
+  private Timestamp mapperStartTime;
+
+  @Override
+  protected void setup(Context context) throws InterruptedException {
+    try {
+      super.setup(context);
+      mapperStartTime = new Timestamp(System.currentTimeMillis());
+      this.conf = context.getConfiguration();
+      tableName = PhoenixConfigurationUtil.getPhoenixSyncTableName(conf);
+      targetZkQuorum = 
PhoenixConfigurationUtil.getPhoenixSyncTableTargetZkQuorum(conf);
+      fromTime = PhoenixConfigurationUtil.getPhoenixSyncTableFromTime(conf);
+      toTime = PhoenixConfigurationUtil.getPhoenixSyncTableToTime(conf);
+      isDryRun = PhoenixConfigurationUtil.getPhoenixSyncTableDryRun(conf);
+      chunkSizeBytes = 
PhoenixConfigurationUtil.getPhoenixSyncTableChunkSizeBytes(conf);
+      extractRegionBoundariesFromSplit(context);
+      sourceConnection = ConnectionUtil.getInputConnection(conf);
+      pTable = 
sourceConnection.unwrap(PhoenixConnection.class).getTable(tableName);
+      physicalTableName = pTable.getPhysicalName().getBytes();
+      connectToTargetCluster();
+      globalConnection = createGlobalConnection(conf);
+      syncTableOutputRepository = new 
PhoenixSyncTableOutputRepository(globalConnection);
+    } catch (SQLException | IOException e) {
+      tryClosingResources();
+      throw new RuntimeException(
+        String.format("Failed to setup PhoenixSyncTableMapper for table: %s", 
tableName), e);
+    }
+  }
+
+  /**
+   * Extracts mapper region boundaries from the PhoenixInputSplit
+   */
+  private void extractRegionBoundariesFromSplit(Context context) {
+    PhoenixInputSplit split = (PhoenixInputSplit) context.getInputSplit();
+    KeyRange keyRange = split.getKeyRange();
+    if (keyRange == null) {
+      throw new IllegalStateException(String.format(
+        "PhoenixInputSplit has no KeyRange for table: %s . Cannot determine 
region boundaries for sync operation.",
+        tableName));
+    }
+    mapperRegionStart = keyRange.getLowerRange();
+    mapperRegionEnd = keyRange.getUpperRange();
+  }
+
+  /**
+   * Connects to the target cluster using the target ZK quorum, port, znode, 
krb principal
+   */
+  private void connectToTargetCluster() throws SQLException, IOException {
+    Configuration targetConf =
+      PhoenixMapReduceUtil.createConfigurationForZkQuorum(conf, 
targetZkQuorum);
+    targetConnection = ConnectionUtil.getInputConnection(targetConf);
+  }
+
+  /**
+   * Creates a global (non-tenant) connection for the checkpoint table.
+   */
+  private Connection createGlobalConnection(Configuration conf) throws 
SQLException {
+    Configuration globalConf = new Configuration(conf);
+    globalConf.unset(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
+    globalConf.unset(PhoenixRuntime.CURRENT_SCN_ATTRIB);
+    return ConnectionUtil.getInputConnection(globalConf);
+  }
+
+  /**
+   * Processes a mapper region by comparing chunks between source and target 
clusters. Gets already
+   * processed chunks from checkpoint table, resumes from check pointed 
progress and records final
+   * status for chunks & mapper (VERIFIED/MISMATCHED).
+   */
+  @Override
+  protected void map(NullWritable key, DBInputFormat.NullDBWritable value, 
Context context)
+    throws IOException, InterruptedException {
+    context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
+    try {
+      List<PhoenixSyncTableOutputRow> processedChunks =
+        syncTableOutputRepository.getProcessedChunks(tableName, 
targetZkQuorum, fromTime, toTime,
+          mapperRegionStart, mapperRegionEnd);
+      List<Pair<byte[], byte[]>> unprocessedRanges =
+        calculateUnprocessedRanges(mapperRegionStart, mapperRegionEnd, 
processedChunks);
+      boolean isStartKeyInclusive = 
shouldStartKeyBeInclusive(mapperRegionStart, processedChunks);
+      for (Pair<byte[], byte[]> range : unprocessedRanges) {
+        processMapperRanges(range.getFirst(), range.getSecond(), 
isStartKeyInclusive, context);
+        isStartKeyInclusive = false;
+      }
+
+      long mismatchedChunk = 
context.getCounter(SyncCounters.CHUNKS_MISMATCHED).getValue();
+      long verifiedChunk = 
context.getCounter(SyncCounters.CHUNKS_VERIFIED).getValue();
+      long sourceRowsProcessed = 
context.getCounter(SyncCounters.SOURCE_ROWS_PROCESSED).getValue();
+      long targetRowsProcessed = 
context.getCounter(SyncCounters.TARGET_ROWS_PROCESSED).getValue();
+      Timestamp mapperEndTime = new Timestamp(System.currentTimeMillis());
+      String counters = formatMapperCounters(verifiedChunk, mismatchedChunk, 
sourceRowsProcessed,
+        targetRowsProcessed);
+
+      if (sourceRowsProcessed > 0) {
+        if (mismatchedChunk == 0) {
+          context.getCounter(PhoenixJobCounters.OUTPUT_RECORDS).increment(1);
+          syncTableOutputRepository.checkpointSyncTableResult(tableName, 
targetZkQuorum,
+            PhoenixSyncTableOutputRow.Type.MAPPER_REGION, fromTime, toTime, 
isDryRun,
+            mapperRegionStart, mapperRegionEnd, 
PhoenixSyncTableOutputRow.Status.VERIFIED,
+            mapperStartTime, mapperEndTime, counters);
+          LOGGER.info(
+            "PhoenixSyncTable mapper completed with verified: {} verified 
chunks, {} mismatched chunks",
+            verifiedChunk, mismatchedChunk);
+        } else {
+          context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
+          LOGGER.warn(
+            "PhoenixSyncTable mapper completed with mismatch: {} verified 
chunks, {} mismatched chunks",
+            verifiedChunk, mismatchedChunk);
+          syncTableOutputRepository.checkpointSyncTableResult(tableName, 
targetZkQuorum,
+            PhoenixSyncTableOutputRow.Type.MAPPER_REGION, fromTime, toTime, 
isDryRun,
+            mapperRegionStart, mapperRegionEnd, 
PhoenixSyncTableOutputRow.Status.MISMATCHED,
+            mapperStartTime, mapperEndTime, counters);
+        }
+      } else {
+        LOGGER.info(
+          "No rows pending to process. All mapper region boundaries are 
covered for startKey:{}, endKey: {}",
+          mapperRegionStart, mapperRegionEnd);
+      }
+    } catch (SQLException e) {
+      tryClosingResources();
+      throw new RuntimeException("Error processing PhoenixSyncTableMapper", e);
+    }
+  }
+
+  /**
+   * Processes a chunk range by comparing source and target cluster data. 
Source chunking: Breaks
+   * data into size-based chunks within given mapper region boundary. Target 
chunking: Follows
+   * source chunk boundaries. Source chunk boundary might be split across 
multiple target region, if
+   * so corpoc signals for partial chunk with partial digest. Once entire 
Source chunk is covered by
+   * target scanner, we calculate resulting checksum from combined digest.
+   * @param rangeStart                Range start key
+   * @param rangeEnd                  Range end key
+   * @param isSourceStartKeyInclusive Whether startKey be inclusive for source 
chunking
+   * @param context                   Mapper context for progress and counters
+   * @throws IOException  if scan fails
+   * @throws SQLException if database operations fail
+   */
+  private void processMapperRanges(byte[] rangeStart, byte[] rangeEnd,
+    boolean isSourceStartKeyInclusive, Context context) throws IOException, 
SQLException {
+    // To handle scenario of target having extra keys compared to source keys:
+    // For every source chunk, we track whether its first chunk of Region or 
whether its lastChunk
+    // of region
+    // For every source chunk, we issue scan on target with
+    // - FirstChunkOfRegion : target scan start boundary would be rangeStart
+    // - LastChunkOfRegion : target scan end boundary would be rangeEnd
+    // - notFirstChunkOfRegion: target scan start boundary would be previous 
source chunk endKey
+    // - notLastChunkOfRegion: target scan end boundary would be current 
source chunk endKey
+    // Lets understand with an example.
+    // Source region boundary is [c,n) and source chunk returns [c1,d] , here 
`c` key is not present
+    // in source
+    // It could be the case that target has `c` present, so we issue scan on 
target chunk with
+    // startKey as `c` and not `c1` i.e [c,d]
+    // Similarly, if two consecutive source chunk returns its boundary as 
[e,g] and [h,j]
+    // When target is scanning for [h,j], it would issue scan with (g,j] to 
ensure we cover any
+    // extra key which is not in source but present in target
+    //
+    // Now eventually when chunking will reach for last source chunk on this 
region boundary, we
+    // again pass rangeEnd(with Exclusive) as target chunk boundary.
+    // Lets say, for above region boundary example second last and last 
sourceChunk returns [j,k]
+    // and [l,m]. Target chunk would issue scan for last chunk (k,n)
+    boolean isLastChunkOfRegion = false;
+    // We only want target startKey to be inclusive if source startKey is 
inclusive as well
+    // Source start key won't be inclusive if start of region boundary is 
already processed as chunk
+    // and check pointed
+    // Refer to shouldStartKeyBeInclusive() method to understand more about 
when source start key
+    // would be exclusive
+    boolean isTargetStartKeyInclusive = isSourceStartKeyInclusive;
+    try (ChunkScannerContext sourceScanner = 
createChunkScanner(sourceConnection, rangeStart,
+      rangeEnd, null, isSourceStartKeyInclusive, false, false)) {
+      ChunkInfo previousSourceChunk = null;
+      ChunkInfo sourceChunk = sourceScanner.getNextChunk();
+      while (sourceChunk != null) {
+        sourceChunk.executionStartTime = new 
Timestamp(System.currentTimeMillis());
+        // Peek ahead to see if this is the last chunk
+        ChunkInfo nextSourceChunk = sourceScanner.getNextChunk();
+        if (nextSourceChunk == null) {
+          isLastChunkOfRegion = true;
+        }
+        ChunkInfo targetChunk = 
getTargetChunkWithSourceBoundary(targetConnection,
+          previousSourceChunk == null ? rangeStart : 
previousSourceChunk.endKey,
+          isLastChunkOfRegion ? rangeEnd : sourceChunk.endKey, 
isTargetStartKeyInclusive,
+          !isLastChunkOfRegion);
+        
context.getCounter(SyncCounters.SOURCE_ROWS_PROCESSED).increment(sourceChunk.rowCount);
+        
context.getCounter(SyncCounters.TARGET_ROWS_PROCESSED).increment(targetChunk.rowCount);
+        boolean matched = MessageDigest.isEqual(sourceChunk.hash, 
targetChunk.hash);
+        if (LOGGER.isDebugEnabled()) {
+          LOGGER.debug(
+            "isSourceStartKeyInclusive: {}, isTargetStartKeyInclusive: {},"
+              + "isTargetEndKeyInclusive: {}, isFirstChunkOfRegion: {}, 
isLastChunkOfRegion: {}."
+              + "Chunk comparison source {}, {}. Key range passed to target 
chunk: {}, {}."
+              + "target chunk returned {}, {}: source={} rows, target={} rows, 
matched={}",
+            isSourceStartKeyInclusive, isTargetStartKeyInclusive, 
!isLastChunkOfRegion,
+            previousSourceChunk == null, isLastChunkOfRegion,
+            Bytes.toStringBinary(sourceChunk.startKey), 
Bytes.toStringBinary(sourceChunk.endKey),
+            Bytes.toStringBinary(
+              previousSourceChunk == null ? rangeStart : 
previousSourceChunk.endKey),
+            Bytes.toStringBinary(isLastChunkOfRegion ? rangeEnd : 
sourceChunk.endKey),
+            Bytes.toStringBinary(targetChunk.startKey), 
Bytes.toStringBinary(targetChunk.endKey),
+            sourceChunk.rowCount, targetChunk.rowCount, matched);
+        }
+        sourceChunk.executionEndTime = new 
Timestamp(System.currentTimeMillis());
+        String counters = formatChunkCounters(sourceChunk.rowCount, 
targetChunk.rowCount);
+        if (matched) {
+          handleVerifiedChunk(sourceChunk, context, counters);
+        } else {
+          handleMismatchedChunk(sourceChunk, context, counters);
+        }
+        previousSourceChunk = sourceChunk;
+        sourceChunk = nextSourceChunk;
+        // After first chunk, our target chunk boundary would be 
previousSourceChunk.endKey,
+        // so start key should not be inclusive
+        isTargetStartKeyInclusive = false;
+        context.progress();
+      }
+    }
+    LOGGER.info("Completed sync table processing of Mapper region boundary {}, 
{}",
+      Bytes.toStringBinary(rangeStart), Bytes.toStringBinary(rangeEnd));
+  }
+
+  /**
+   * Scans target across multiple regions and returns a single combined 
ChunkInfo. Handles partial
+   * chunks by passing digest state to next scanner via scan attributes, 
enabling cross-region
+   * digest continuation. Since we are scanning rows based on source chunk 
boundary, it could be
+   * distributed across multiple target regions. We keep on creating scanner 
across target region
+   * until entire source chunk boundary is processed or chunk is null
+   * @param conn     Target connection
+   * @param startKey Source chunk start key
+   * @param endKey   Source chunk end key
+   * @return Single ChunkInfo with final hash from all target regions
+   */
+  private ChunkInfo getTargetChunkWithSourceBoundary(Connection conn, byte[] 
startKey,
+    byte[] endKey, boolean isTargetStartKeyInclusive, boolean 
isTargetEndKeyInclusive)
+    throws IOException, SQLException {
+    ChunkInfo combinedTargetChunk = new ChunkInfo();
+    combinedTargetChunk.startKey = null;
+    combinedTargetChunk.endKey = null;
+    combinedTargetChunk.hash = null;
+    combinedTargetChunk.rowCount = 0;
+    byte[] currentStartKey = startKey;
+    byte[] continuedDigestState = null;
+    ChunkInfo chunk;
+    while (true) {
+      // Each iteration scans one target region. The coprocessor processes all 
rows in
+      // that region within the scan range. For target boundary, the chunk is 
always
+      // marked partial and the digest state is passed to the next
+      // scanner for cross-region hash continuation.
+      try (ChunkScannerContext scanner = createChunkScanner(conn, 
currentStartKey, endKey,
+        continuedDigestState, isTargetStartKeyInclusive, 
isTargetEndKeyInclusive, true)) {
+        chunk = scanner.getNextChunk();
+        // chunk == null means no more rows in the target range.
+        // We must finalize the digest to produce a proper checksum for 
comparison.
+        if (chunk == null) {
+          if (continuedDigestState != null) {
+            combinedTargetChunk.hash =
+              SHA256DigestUtil.finalizeDigestToChecksum(continuedDigestState);
+          }
+          break;
+        }
+        if (combinedTargetChunk.startKey == null) {
+          combinedTargetChunk.startKey = chunk.startKey;
+        }
+        combinedTargetChunk.endKey = chunk.endKey;
+        combinedTargetChunk.rowCount += chunk.rowCount;
+        continuedDigestState = chunk.hash;
+        currentStartKey = chunk.endKey;
+        isTargetStartKeyInclusive = false;
+      }
+    }
+    return combinedTargetChunk;
+  }
+
+  /**
+   * Creates a reusable scanner context for fetching chunks from a range.
+   * @param conn                 Connection to cluster (source or target)
+   * @param startKey             Range start key (inclusive)
+   * @param endKey               Range end key (exclusive)
+   * @param continuedDigestState If not null, coprocessor will continue 
hashing from this state (for
+   *                             cross-region continuation on target)
+   * @param isStartKeyInclusive  Whether StartKey Inclusive
+   * @param isEndKeyInclusive    Whether EndKey Inclusive
+   * @throws IOException  scanner creation fails
+   * @throws SQLException hTable connection fails
+   */
+  private ChunkScannerContext createChunkScanner(Connection conn, byte[] 
startKey, byte[] endKey,
+    byte[] continuedDigestState, boolean isStartKeyInclusive, boolean 
isEndKeyInclusive,
+    boolean isTargetScan) throws IOException, SQLException {
+    // Not using try-with-resources since ChunkScannerContext owns the table 
lifecycle
+    Table hTable =
+      
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(physicalTableName);
+    Scan scan =
+      createChunkScan(startKey, endKey, isStartKeyInclusive, 
isEndKeyInclusive, isTargetScan);
+    
scan.setAttribute(BaseScannerRegionObserverConstants.SYNC_TABLE_CHUNK_FORMATION,
 TRUE_BYTES);
+    
scan.setAttribute(BaseScannerRegionObserverConstants.SKIP_REGION_BOUNDARY_CHECK,
 TRUE_BYTES);
+    scan.setAttribute(BaseScannerRegionObserverConstants.UNGROUPED_AGG, 
TRUE_BYTES);
+    if (continuedDigestState != null && continuedDigestState.length > 0) {
+      
scan.setAttribute(BaseScannerRegionObserverConstants.SYNC_TABLE_CONTINUED_DIGEST_STATE,
+        continuedDigestState);
+    }
+
+    if (!isTargetScan) {
+      
scan.setAttribute(BaseScannerRegionObserverConstants.SYNC_TABLE_CHUNK_SIZE_BYTES,
+        Bytes.toBytes(chunkSizeBytes));
+    }
+    long syncTablePageTimeoutMs = (long) 
(conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY,
+      QueryServicesOptions.DEFAULT_SYNC_TABLE_RPC_TIMEOUT) * 0.5);

Review Comment:
   What is the basis for this multiplier? Should it be a configurable value?



##########
phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java:
##########
@@ -1207,6 +1207,10 @@ public static boolean isIndexRebuild(Scan scan) {
     return 
scan.getAttribute((BaseScannerRegionObserverConstants.REBUILD_INDEXES)) != null;
   }
 
+  public static boolean isSyncTableChunkFormation(Scan scan) {
+    return 
scan.getAttribute(BaseScannerRegionObserverConstants.SYNC_TABLE_CHUNK_FORMATION)
 != null;

Review Comment:
   I see you are explicitly setting to `TRUE_BYTES` but here you are just 
checking for existence, not a very good practice as it will break if someone 
sets it to `FALSE_BYTES` so better to be defensive and check the value.



##########
phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java:
##########
@@ -1207,6 +1207,10 @@ public static boolean isIndexRebuild(Scan scan) {
     return 
scan.getAttribute((BaseScannerRegionObserverConstants.REBUILD_INDEXES)) != null;
   }
 
+  public static boolean isSyncTableChunkFormation(Scan scan) {

Review Comment:
   Do you mean isSyncTableChunkFormationEnabled?



##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableMapper.java:
##########
@@ -0,0 +1,723 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SHA256DigestUtil;
+import org.apache.phoenix.util.ScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Mapper that acts as a driver for validating table data between source and 
target clusters. The
+ * actual work of chunking and hashing is done server-side by the coprocessor. 
This mapper fetches
+ * chunk hashes from both clusters, compares them and write to checkpoint 
table.
+ */
+public class PhoenixSyncTableMapper
+  extends Mapper<NullWritable, DBInputFormat.NullDBWritable, NullWritable, 
NullWritable> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PhoenixSyncTableMapper.class);
+
+  public enum SyncCounters {
+    CHUNKS_VERIFIED,
+    CHUNKS_MISMATCHED,
+    SOURCE_ROWS_PROCESSED,
+    TARGET_ROWS_PROCESSED
+  }
+
+  private String tableName;
+  private String targetZkQuorum;
+  private Long fromTime;
+  private Long toTime;
+  private boolean isDryRun;
+  private long chunkSizeBytes;
+  private Configuration conf;
+  private Connection sourceConnection;
+  private Connection targetConnection;
+  private Connection globalConnection;
+  private PTable pTable;
+  private byte[] physicalTableName;
+  private byte[] mapperRegionStart;
+  private byte[] mapperRegionEnd;
+  private PhoenixSyncTableOutputRepository syncTableOutputRepository;
+  private Timestamp mapperStartTime;
+
+  @Override
+  protected void setup(Context context) throws InterruptedException {
+    try {
+      super.setup(context);
+      mapperStartTime = new Timestamp(System.currentTimeMillis());
+      this.conf = context.getConfiguration();
+      tableName = PhoenixConfigurationUtil.getPhoenixSyncTableName(conf);
+      targetZkQuorum = 
PhoenixConfigurationUtil.getPhoenixSyncTableTargetZkQuorum(conf);
+      fromTime = PhoenixConfigurationUtil.getPhoenixSyncTableFromTime(conf);
+      toTime = PhoenixConfigurationUtil.getPhoenixSyncTableToTime(conf);
+      isDryRun = PhoenixConfigurationUtil.getPhoenixSyncTableDryRun(conf);
+      chunkSizeBytes = 
PhoenixConfigurationUtil.getPhoenixSyncTableChunkSizeBytes(conf);
+      extractRegionBoundariesFromSplit(context);
+      sourceConnection = ConnectionUtil.getInputConnection(conf);
+      pTable = 
sourceConnection.unwrap(PhoenixConnection.class).getTable(tableName);
+      physicalTableName = pTable.getPhysicalName().getBytes();
+      connectToTargetCluster();
+      globalConnection = createGlobalConnection(conf);
+      syncTableOutputRepository = new 
PhoenixSyncTableOutputRepository(globalConnection);
+    } catch (SQLException | IOException e) {

Review Comment:
   I would include RuntimeException too, to be more aggressive in avoiding a 
resource leak.



##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRow.java:
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.Objects;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Data model class representing required row in the 
PHOENIX_SYNC_TABLE_CHECKPOINT table
+ */
+public class PhoenixSyncTableOutputRow {
+
+  public enum Type {
+    CHUNK,
+    MAPPER_REGION
+  }
+
+  public enum Status {
+    VERIFIED,
+    MISMATCHED
+  }
+
+  private String tableName;
+  private String targetCluster;
+  private Type type;
+  private Long fromTime;
+  private Long toTime;
+  private Boolean isDryRun;
+  private byte[] startRowKey;
+  private byte[] endRowKey;
+  private Boolean isFirstRegion;
+  private Timestamp executionStartTime;
+  private Timestamp executionEndTime;
+  private Status status;
+  private String counters;
+
+  @Override
+  public String toString() {
+    return String.format("SyncOutputRow[table=%s, target=%s, type=%s, 
start=%s, end=%s, status=%s]",
+      tableName, targetCluster, type, Bytes.toStringBinary(startRowKey),
+      Bytes.toStringBinary(endRowKey), status);
+  }
+
+  @Override
+  @VisibleForTesting
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    PhoenixSyncTableOutputRow that = (PhoenixSyncTableOutputRow) o;
+    return Objects.equals(tableName, that.tableName)
+      && Objects.equals(targetCluster, that.targetCluster) && type == that.type
+      && Objects.equals(fromTime, that.fromTime) && Objects.equals(toTime, 
that.toTime)
+      && Objects.equals(isDryRun, that.isDryRun) && Arrays.equals(startRowKey, 
that.startRowKey)
+      && Arrays.equals(endRowKey, that.endRowKey)
+      && Objects.equals(isFirstRegion, that.isFirstRegion)
+      && Objects.equals(executionStartTime, that.executionStartTime)
+      && Objects.equals(executionEndTime, that.executionEndTime) && status == 
that.status
+      && Objects.equals(counters, that.counters);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = Objects.hash(tableName, targetCluster, type, fromTime, 
toTime, isDryRun,
+      isFirstRegion, executionStartTime, executionEndTime, status, counters);
+    result = 31 * result + Arrays.hashCode(startRowKey);
+    result = 31 * result + Arrays.hashCode(endRowKey);
+    return result;
+  }
+
+  @VisibleForTesting
+  public String getTableName() {
+    return tableName;
+  }
+
+  @VisibleForTesting
+  public String getTargetCluster() {
+    return targetCluster;
+  }
+
+  @VisibleForTesting
+  public Type getType() {
+    return type;
+  }
+
+  @VisibleForTesting
+  public Long getFromTime() {
+    return fromTime;
+  }
+
+  @VisibleForTesting
+  public Long getToTime() {
+    return toTime;
+  }
+
+  public byte[] getStartRowKey() {
+    return startRowKey != null ? Arrays.copyOf(startRowKey, 
startRowKey.length) : null;
+  }
+
+  public byte[] getEndRowKey() {
+    return endRowKey != null ? Arrays.copyOf(endRowKey, endRowKey.length) : 
null;
+  }
+
+  @VisibleForTesting
+  public Timestamp getExecutionStartTime() {
+    return executionStartTime;
+  }
+
+  @VisibleForTesting
+  public Timestamp getExecutionEndTime() {
+    return executionEndTime;
+  }
+
+  @VisibleForTesting
+  public Status getStatus() {
+    return status;
+  }
+
+  @VisibleForTesting
+  public String getCounters() {
+    return counters;
+  }
+
+  @VisibleForTesting
+  public long getSourceRowsProcessed() {
+    return 
parseCounterValue(PhoenixSyncTableMapper.SyncCounters.SOURCE_ROWS_PROCESSED.name());
+  }
+
+  @VisibleForTesting
+  public long getTargetRowsProcessed() {
+    return 
parseCounterValue(PhoenixSyncTableMapper.SyncCounters.TARGET_ROWS_PROCESSED.name());
+  }
+
+  @VisibleForTesting

Review Comment:
   On a private function?



##########
phoenix-core-client/src/main/java/org/apache/phoenix/util/SHA256DigestUtil.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.bouncycastle.crypto.digests.SHA256Digest;
+
+/**
+ * Utility class for SHA-256 digest state serialization and deserialization. 
We are not using jdk
+ * bundled SHA, since their digest can't be serialized/deserialized which is 
needed for
+ * PhoenixSyncTableTool for cross-region hash continuation.
+ */
+public class SHA256DigestUtil {
+
+  /**
+   * Maximum allowed size for encoded SHA-256 digest state. SHA-256 state is 
~96 bytes, we allow up

Review Comment:
   I didn't verify this, but DeepWiki says it can be up to 309 bytes: 
https://deepwiki.com/search/is-there-an-upper-limit-to-the_7872e61f-4f3f-462e-b4e9-cb6cbed47bd8?mode=fast



##########
phoenix-core-client/src/main/java/org/apache/phoenix/util/SHA256DigestUtil.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.bouncycastle.crypto.digests.SHA256Digest;
+
+/**
+ * Utility class for SHA-256 digest state serialization and deserialization. 
We are not using jdk
+ * bundled SHA, since their digest can't be serialized/deserialized which is 
needed for
+ * PhoenixSyncTableTool for cross-region hash continuation.
+ */
+public class SHA256DigestUtil {
+
+  /**
+   * Maximum allowed size for encoded SHA-256 digest state. SHA-256 state is 
~96 bytes, we allow up
+   * to 128 bytes as buffer.
+   */
+  public static final int MAX_SHA256_DIGEST_STATE_SIZE = 128;
+
+  /**
+   * Encodes a SHA256Digest state to a byte array with length prefix for 
validation. Format: [4-byte
+   * integer length][encoded digest state bytes]
+   * @param digest The digest whose state should be encoded
+   * @return Byte array containing integer length prefix + encoded state
+   */
+  public static byte[] encodeDigestState(SHA256Digest digest) {
+    byte[] encoded = digest.getEncodedState();
+    ByteBuffer buffer = ByteBuffer.allocate(Bytes.SIZEOF_INT + encoded.length);
+    buffer.putInt(encoded.length);
+    buffer.put(encoded);
+    return buffer.array();
+  }
+
+  /**
+   * Decodes a SHA256Digest state from a byte array.
+   * @param encodedState Byte array containing 4-byte integer length prefix + 
encoded state
+   * @return SHA256Digest restored to the saved state
+   * @throws IOException if state is invalid, corrupted
+   */
+  public static SHA256Digest decodeDigestState(byte[] encodedState) throws 
IOException {
+    if (encodedState == null) {
+      throw new IllegalArgumentException("Invalid encoded digest state: 
encodedState is null");
+    }
+
+    DataInputStream dis = new DataInputStream(new 
ByteArrayInputStream(encodedState));
+    int stateLength = dis.readInt();
+    // Prevent malicious large allocations
+    if (stateLength > MAX_SHA256_DIGEST_STATE_SIZE) {
+      throw new IllegalArgumentException(
+        String.format("Invalid SHA256 state length: %d, expected <= %d", 
stateLength,
+          MAX_SHA256_DIGEST_STATE_SIZE));
+    }
+
+    byte[] state = new byte[stateLength];
+    dis.readFully(state);

Review Comment:
   Following my suggestion in encode, this will simply become:
   
   
   ```suggestion
       int stateLength = encodedState[0] & 0xff;
       // Prevent malicious large allocations
       if (stateLength > MAX_SHA256_DIGEST_STATE_SIZE) {
         throw new IllegalArgumentException(
           String.format("Invalid SHA256 state length: %d, expected <= %d", 
stateLength,
             MAX_SHA256_DIGEST_STATE_SIZE));
       }
   
       byte[] state = new byte[stateLength];
       System.arraycopy(encodedState, 1, state, 0, stateLength);
   ```



##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.phoenix.mapreduce.PhoenixSyncTableOutputRow.Status;
+import org.apache.phoenix.mapreduce.PhoenixSyncTableOutputRow.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Repository for managing the PHOENIX_SYNC_TABLE_CHECKPOINT table. This table 
stores checkpoint
+ * information for the PhoenixSyncTableTool, enabling: 1. Mapper Level 
checkpointing (skip completed
+ * mapper regions on restart) 2. Chunk level checkpointing (skip completed 
chunks)
+ */
+public class PhoenixSyncTableOutputRepository {
+
+  private static final Logger LOGGER =
+    LoggerFactory.getLogger(PhoenixSyncTableOutputRepository.class);
+  public static final String SYNC_TABLE_CHECKPOINT_TABLE_NAME = 
"PHOENIX_SYNC_TABLE_CHECKPOINT";
+  private static final int OUTPUT_TABLE_TTL_SECONDS = 90 * 24 * 60 * 60; // 90 
days
+  private final Connection connection;
+  private static final String UPSERT_CHECKPOINT_SQL = "UPSERT INTO "
+    + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (TABLE_NAME, TARGET_CLUSTER, TYPE, 
FROM_TIME, TO_TIME,"
+    + " START_ROW_KEY, END_ROW_KEY, IS_DRY_RUN, EXECUTION_START_TIME, 
EXECUTION_END_TIME,"
+    + " STATUS, COUNTERS) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+  /**
+   * Creates a repository for managing sync table checkpoint operations. Note: 
The connection is
+   * stored as-is and shared across operations. The caller retains ownership 
and is responsible for
+   * connection lifecycle.
+   * @param connection Phoenix connection (must remain open for repository 
lifetime)
+   */
+  public PhoenixSyncTableOutputRepository(Connection connection) {
+    this.connection = connection;
+  }
+
+  public void createSyncCheckpointTableIfNotExists() throws SQLException {
+    String ddl = "CREATE TABLE IF NOT EXISTS " + 
SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (\n"
+      + "    TABLE_NAME VARCHAR NOT NULL,\n" + "    TARGET_CLUSTER VARCHAR NOT 
NULL,\n"
+      + "    TYPE VARCHAR(20) NOT NULL,\n" + "    FROM_TIME BIGINT NOT NULL,\n"
+      + "    TO_TIME BIGINT NOT NULL,\n" + "    START_ROW_KEY 
VARBINARY_ENCODED,\n"
+      + "    END_ROW_KEY VARBINARY_ENCODED,\n" + "    IS_DRY_RUN BOOLEAN, \n"
+      + "    EXECUTION_START_TIME TIMESTAMP,\n" + "    EXECUTION_END_TIME 
TIMESTAMP,\n"
+      + "    STATUS VARCHAR(20),\n" + "    COUNTERS VARCHAR(255), \n"
+      + "    CONSTRAINT PK PRIMARY KEY (\n" + "        TABLE_NAME,\n" + "      
  TARGET_CLUSTER,\n"
+      + "        TYPE ,\n" + "        FROM_TIME,\n" + "        TO_TIME,\n"
+      + "        START_ROW_KEY )" + ") TTL=" + OUTPUT_TABLE_TTL_SECONDS;
+
+    try (Statement stmt = connection.createStatement()) {
+      stmt.execute(ddl);
+      connection.commit();
+      LOGGER.info("Successfully created or verified existence of {} table",
+        SYNC_TABLE_CHECKPOINT_TABLE_NAME);
+    }
+  }
+
+  public void checkpointSyncTableResult(String tableName, String 
targetCluster, Type type,
+    Long fromTime, Long toTime, boolean isDryRun, byte[] startKey, byte[] 
endKey, Status status,
+    Timestamp executionStartTime, Timestamp executionEndTime, String counters) 
throws SQLException {
+
+    // Validate required parameters
+    if (tableName == null || tableName.isEmpty()) {
+      throw new IllegalArgumentException("TableName cannot be null or empty 
for checkpoint");
+    }
+    if (targetCluster == null || targetCluster.isEmpty()) {
+      throw new IllegalArgumentException("TargetCluster cannot be null or 
empty for checkpoint");
+    }
+    if (type == null) {
+      throw new IllegalArgumentException("Type cannot be null for checkpoint");
+    }
+    if (fromTime == null || toTime == null) {
+      throw new IllegalArgumentException("FromTime and ToTime cannot be null 
for checkpoint");
+    }
+
+    try (PreparedStatement ps = 
connection.prepareStatement(UPSERT_CHECKPOINT_SQL)) {
+      ps.setString(1, tableName);
+      ps.setString(2, targetCluster);
+      ps.setString(3, type.name());
+      ps.setLong(4, fromTime);
+      ps.setLong(5, toTime);
+      ps.setBytes(6, startKey);
+      ps.setBytes(7, endKey);
+      ps.setBoolean(8, isDryRun);
+      ps.setTimestamp(9, executionStartTime);
+      ps.setTimestamp(10, executionEndTime);
+      ps.setString(11, status != null ? status.name() : null);
+      ps.setString(12, counters);
+      ps.executeUpdate();
+      connection.commit();
+    }
+  }
+
+  /**
+   * Queries for completed mapper regions. Used by PhoenixSyncTableInputFormat 
to filter out
+   * already-processed regions.
+   * @param tableName     Source table name
+   * @param targetCluster Target cluster ZK quorum
+   * @param fromTime      Start timestamp (nullable)
+   * @param toTime        End timestamp (nullable)
+   * @return List of completed mapper regions
+   */
+  public List<PhoenixSyncTableOutputRow> getProcessedMapperRegions(String 
tableName,
+    String targetCluster, Long fromTime, Long toTime) throws SQLException {
+
+    String query = "SELECT START_ROW_KEY, END_ROW_KEY FROM " + 
SYNC_TABLE_CHECKPOINT_TABLE_NAME
+      + " WHERE TABLE_NAME = ?  AND TARGET_CLUSTER = ?"
+      + " AND TYPE = ? AND FROM_TIME = ? AND TO_TIME = ? AND STATUS IN ( ?, 
?)";
+    List<PhoenixSyncTableOutputRow> results = new ArrayList<>();
+    try (PreparedStatement ps = connection.prepareStatement(query)) {
+      int paramIndex = 1;
+      ps.setString(paramIndex++, tableName);
+      ps.setString(paramIndex++, targetCluster);
+      ps.setString(paramIndex++, Type.MAPPER_REGION.name());
+      ps.setLong(paramIndex++, fromTime);
+      ps.setLong(paramIndex++, toTime);
+      ps.setString(paramIndex++, Status.VERIFIED.name());
+      ps.setString(paramIndex, Status.MISMATCHED.name());
+      try (ResultSet rs = ps.executeQuery()) {
+        while (rs.next()) {
+          PhoenixSyncTableOutputRow row =
+            new 
PhoenixSyncTableOutputRow.Builder().setStartRowKey(rs.getBytes("START_ROW_KEY"))
+              .setEndRowKey(rs.getBytes("END_ROW_KEY")).build();
+          results.add(row);
+        }
+      }
+    }
+    return results;
+  }
+
+  /**
+   * Queries for processed chunks. Used by PhoenixSyncTableMapper to skip 
already-processed chunks.
+   * @param tableName         Source table name
+   * @param targetCluster     Target cluster ZK quorum
+   * @param fromTime          Start timestamp (nullable)
+   * @param toTime            End timestamp (nullable)
+   * @param mapperRegionStart Mapper region start key
+   * @param mapperRegionEnd   Mapper region end key
+   * @return List of processed chunks in the region
+   */
+  public List<PhoenixSyncTableOutputRow> getProcessedChunks(String tableName, 
String targetCluster,
+    Long fromTime, Long toTime, byte[] mapperRegionStart, byte[] 
mapperRegionEnd)
+    throws SQLException {
+    StringBuilder queryBuilder = new StringBuilder();
+    queryBuilder.append("SELECT START_ROW_KEY, END_ROW_KEY FROM " + 
SYNC_TABLE_CHECKPOINT_TABLE_NAME
+      + " WHERE TABLE_NAME = ? AND TARGET_CLUSTER = ? "
+      + " AND TYPE = ? AND FROM_TIME = ? AND TO_TIME = ?");
+
+    // Check if mapper region boundaries are non-empty (i.e., NOT first/last 
regions)
+    // Only add boundary conditions for non-empty boundaries
+    boolean hasEndBoundary = mapperRegionEnd != null && mapperRegionEnd.length 
> 0;
+    boolean hasStartBoundary = mapperRegionStart != null && 
mapperRegionStart.length > 0;
+
+    // Filter chunks that overlap with this mapper region:
+    // - Chunk overlaps if: chunkStart < mapperRegionEnd (when end boundary 
exists)
+    // - Chunk overlaps if: chunkEnd > mapperRegionStart (when start boundary 
exists)
+    if (hasEndBoundary) {
+      queryBuilder.append(" AND START_ROW_KEY <= ?");
+    }
+    if (hasStartBoundary) {
+      queryBuilder.append(" AND END_ROW_KEY >= ?");

Review Comment:
   For last region, there will be no constraint on START_ROW_KEY and 
END_ROW_KEY is not part of the PK, so this can perform poorly, I think.



##########
phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixSyncTableRegionScanner.java:
##########
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import static 
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.DEFAULT_PHOENIX_SYNC_TABLE_CHUNK_SIZE_BYTES;
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+import static org.apache.phoenix.schema.types.PDataType.FALSE_BYTES;
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
+import static org.apache.phoenix.util.ScanUtil.getDummyResult;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
+import org.apache.phoenix.util.SHA256DigestUtil;
+import org.apache.phoenix.util.ScanUtil;
+import org.bouncycastle.crypto.digests.SHA256Digest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Server-side coprocessor that performs chunk formation and SHA-256 hashing 
for
+ * PhoenixSyncTableTool.
+ * <p>
+ * Accumulates rows into chunks (based on size limits) and computes a hash of 
all row data (keys,
+ * column families, qualifiers, timestamps, cell types, values). In case of 
paging timeout, return
+ * whatever is accumulated in chunk. If nothing is accumulated return dummy 
row either with prev
+ * result rowKey or max possible key < currentRowKey
+ * <p>
+ * Source scan (isTargetScan=false): Returns complete chunks(if paging dint 
timeout) bounded by
+ * region boundaries. Sets hasMoreRows=false when region is exhausted.
+ * <p>
+ * Target scan (isTargetScan=true): Returns partial chunks with serialized 
digest state when region
+ * boundary is reached, allowing cross-region hash continuation.
+ * <p>
+ * Returns chunk metadata cells: START_KEY, END_KEY, HASH (or digest state), 
ROW_COUNT,
+ * IS_PARTIAL_CHUNK
+ */
+public class PhoenixSyncTableRegionScanner extends BaseRegionScanner {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PhoenixSyncTableRegionScanner.class);
+  private static final byte[] CHUNK_METADATA_FAMILY = SINGLE_COLUMN_FAMILY;
+  private final Region region;
+  private final Scan scan;
+  private final RegionCoprocessorEnvironment env;
+  private final UngroupedAggregateRegionObserver 
ungroupedAggregateRegionObserver;
+  private final long chunkSizeBytes;
+  private boolean isTargetScan = false;
+  private byte[] chunkStartKey = null;
+  private byte[] chunkEndKey = null;
+  private long currentChunkSize = 0L;
+  private long currentChunkRowCount = 0L;
+  private final SHA256Digest digest;
+  private boolean hasMoreRows = true;
+  private boolean isUsingContinuedDigest;
+  private byte[] previousResultRowKey = null;
+  private final byte[] initStartRowKey;
+  private final boolean includeInitStartRowKey;
+  private final long pageSizeMs;
+
+  /**
+   * Creates a PhoenixSyncTableRegionScanner for chunk-based hashing.
+   * @param innerScanner                     The underlying region scanner
+   * @param region                           The region being scanned
+   * @param scan                             The scan request
+   * @param env                              The coprocessor environment
+   * @param ungroupedAggregateRegionObserver Parent observer for region state 
checks
+   * @throws IllegalStateException if digest state restoration fails
+   */
+  @VisibleForTesting
+  public PhoenixSyncTableRegionScanner(final RegionScanner innerScanner, final 
Region region,
+    final Scan scan, final RegionCoprocessorEnvironment env,
+    final UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver, 
long pageSizeMs) {
+    super(innerScanner);
+    this.region = region;
+    this.scan = scan;
+    this.env = env;
+    this.ungroupedAggregateRegionObserver = ungroupedAggregateRegionObserver;
+    byte[] chunkSizeAttr =
+      
scan.getAttribute(BaseScannerRegionObserverConstants.SYNC_TABLE_CHUNK_SIZE_BYTES);
+    if (chunkSizeAttr == null) { // Since we don't set chunk size scan attr 
for target cluster scan
+      this.isTargetScan = true;
+    }
+    this.chunkSizeBytes = chunkSizeAttr != null
+      ? Bytes.toLong(chunkSizeAttr)
+      : DEFAULT_PHOENIX_SYNC_TABLE_CHUNK_SIZE_BYTES;
+
+    // Check if we should continue from a previous digest state (cross-region 
continuation)
+    byte[] continuedDigestStateAttr =
+      
scan.getAttribute(BaseScannerRegionObserverConstants.SYNC_TABLE_CONTINUED_DIGEST_STATE);
+    if (continuedDigestStateAttr != null) {
+      try {
+        this.digest = 
SHA256DigestUtil.decodeDigestState(continuedDigestStateAttr);
+        this.isUsingContinuedDigest = true;
+      } catch (IOException e) {
+        throw new IllegalStateException("Failed to restore continued digest 
state", e);
+      }
+    } else {
+      this.digest = new SHA256Digest();
+      this.isUsingContinuedDigest = false;
+    }
+    this.initStartRowKey = scan.getStartRow();
+    this.includeInitStartRowKey = scan.includeStartRow();
+    this.pageSizeMs = pageSizeMs;
+  }
+
+  @Override
+  public boolean next(List<Cell> results) throws IOException {
+    return next(results, null);
+  }
+
+  /**
+   * Accumulates rows into a chunk and returns chunk metadata cells. Supports 
server-side paging via
+   * {@link PhoenixScannerContext} following the same pattern as
+   * {@link GroupedAggregateRegionObserver} and {@link 
UncoveredIndexRegionScanner}.
+   * @param results        Output list to populate with chunk metadata cells
+   * @param scannerContext Phoenix scanner context for paging timeout detection
+   * @return true if more chunks available, false if scanning complete
+   */
+  @Override
+  public boolean next(List<Cell> results, ScannerContext scannerContext) 
throws IOException {
+    region.startRegionOperation();
+    try {
+      resetChunkState();
+      RegionScanner localScanner = delegate;
+      synchronized (localScanner) {
+        List<Cell> rowCells = new ArrayList<>();
+        while (hasMoreRows) {
+          ungroupedAggregateRegionObserver.checkForRegionClosingOrSplitting();
+          rowCells.clear();
+          hasMoreRows = (scannerContext == null)
+            ? localScanner.nextRaw(rowCells)
+            : localScanner.nextRaw(rowCells, scannerContext);
+
+          if (!rowCells.isEmpty() && ScanUtil.isDummy(rowCells)) {
+            if (chunkStartKey == null) {
+              LOGGER.warn("Paging timed out while fetching first row of chunk, 
initStartRowKey: {}",
+                Bytes.toStringBinary(initStartRowKey));
+              updateDummyWithPrevRowKey(results, initStartRowKey, 
includeInitStartRowKey, scan);
+              return true;
+            } else {
+              break;
+            }
+          }
+
+          if (rowCells.isEmpty()) {
+            break;
+          }
+
+          byte[] rowKey = CellUtil.cloneRow(rowCells.get(0));
+          long rowSize = calculateRowSize(rowCells);
+          addRowToChunk(rowKey, rowCells, rowSize);
+          if (!isTargetScan && willExceedChunkLimits(rowSize)) {
+            break;
+          }
+          if (
+            hasMoreRows && 
(PhoenixScannerContext.isReturnImmediately(scannerContext)
+              || PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs))
+          ) {
+            LOGGER.info("Paging timeout after {} rows ({} bytes) in region {}, 
chunk [{}:{}]",
+              currentChunkRowCount, currentChunkSize,
+              region.getRegionInfo().getRegionNameAsString(), 
Bytes.toStringBinary(chunkStartKey),
+              Bytes.toStringBinary(chunkEndKey));
+            PhoenixScannerContext.setReturnImmediately(scannerContext);
+            break;
+          }
+        }
+      }
+      if (chunkStartKey == null) {
+        return false;
+      }
+
+      buildChunkMetadataResult(results, isTargetScan);
+      previousResultRowKey = chunkEndKey;
+      return hasMoreRows;
+    } catch (Throwable t) {
+      LOGGER.error(
+        "Exception during chunk scanning in region {} table {} at chunk 
startKey: {}, endKey: {})",
+        region.getRegionInfo().getRegionNameAsString(),
+        region.getRegionInfo().getTable().getNameAsString(),
+        chunkStartKey != null ? Bytes.toStringBinary(chunkStartKey) : "null",
+        chunkEndKey != null ? Bytes.toStringBinary(chunkEndKey) : "null", t);
+      throw t;
+    } finally {
+      region.closeRegionOperation();
+    }
+  }
+
+  /**
+   * Resets chunk state for a new chunk. Note: If this scanner was initialized 
with continued digest
+   * state, the first call to this method will NOT reset the digest, allowing 
us to continue hashing
+   * from the previous region's state.
+   */
+  private void resetChunkState() {
+    chunkStartKey = null;
+    chunkEndKey = null;
+    currentChunkSize = 0;
+    currentChunkRowCount = 0;
+    if (!isUsingContinuedDigest) {
+      digest.reset();
+    }
+    isUsingContinuedDigest = false;
+  }
+
+  private long calculateRowSize(List<Cell> cells) {
+    long size = 0;
+    for (Cell cell : cells) {
+      size += PrivateCellUtil.estimatedSerializedSizeOf(cell);
+    }
+    return size;
+  }
+
+  private boolean willExceedChunkLimits(long rowSize) {
+    return currentChunkSize + rowSize > chunkSizeBytes;
+  }
+
+  /**
+   * Adds a row to the current chunk and updates digest
+   */
+  private void addRowToChunk(byte[] rowKey, List<Cell> cells, long rowSize) {
+    // Set chunk start key on first row
+    if (chunkStartKey == null) {
+      chunkStartKey = rowKey;
+    }
+    chunkEndKey = rowKey;
+    currentChunkSize += rowSize;
+    currentChunkRowCount++;
+    updateDigestWithRow(rowKey, cells);
+  }
+
+  /**
+   * Updates the SHA-256 digest with data from a row. Hash includes: row key + 
cell family + cell
+   * qualifier + cell timestamp + cell type + cell value. This ensures that 
any difference in the
+   * data will result in different hashes.
+   */
+  private void updateDigestWithRow(byte[] rowKey, List<Cell> cells) {
+    digest.update(rowKey, 0, rowKey.length);
+    byte[] timestampBuffer = new byte[8];
+    for (Cell cell : cells) {
+      digest.update(cell.getFamilyArray(), cell.getFamilyOffset(), 
cell.getFamilyLength());
+      digest.update(cell.getQualifierArray(), cell.getQualifierOffset(), 
cell.getQualifierLength());
+      long ts = cell.getTimestamp();
+      Bytes.putLong(timestampBuffer, 0, ts);
+      digest.update(timestampBuffer, 0, 8);
+      digest.update(cell.getType().getCode());
+      digest.update(cell.getValueArray(), cell.getValueOffset(), 
cell.getValueLength());
+    }
+  }
+
+  /**
+   * Builds chunk metadata result cells and adds them to the results list. 
Returns a single
+   * "row"[rowKey=chunkEndKey] with multiple cells containing chunk 
metadata[chunkStartKey,
+   * hash/digest, rowCount, isPartialChunk]. For complete chunks: includes 
final SHA-256 hash (32
+   * bytes) For partial chunks: includes serialized MessageDigest state for 
continuation
+   * @param results        Output list to populate with chunk metadata cells
+   * @param isPartialChunk true if this is a partial chunk (region boundary 
reached before
+   *                       completion)
+   */
+  private void buildChunkMetadataResult(List<Cell> results, boolean 
isPartialChunk)
+    throws IOException {
+    byte[] resultRowKey = this.chunkEndKey;
+    results.add(PhoenixKeyValueUtil.newKeyValue(resultRowKey, 
CHUNK_METADATA_FAMILY,
+      BaseScannerRegionObserverConstants.SYNC_TABLE_START_KEY_QUALIFIER, 
AGG_TIMESTAMP,
+      chunkStartKey));
+    results.add(PhoenixKeyValueUtil.newKeyValue(resultRowKey, 
CHUNK_METADATA_FAMILY,
+      BaseScannerRegionObserverConstants.SYNC_TABLE_ROW_COUNT_QUALIFIER, 
AGG_TIMESTAMP,
+      Bytes.toBytes(currentChunkRowCount)));
+    if (isPartialChunk) {
+      // Partial chunk digest
+      byte[] digestState = SHA256DigestUtil.encodeDigestState(digest);
+      results.add(PhoenixKeyValueUtil.newKeyValue(resultRowKey, 
CHUNK_METADATA_FAMILY,
+        
BaseScannerRegionObserverConstants.SYNC_TABLE_IS_PARTIAL_CHUNK_QUALIFIER, 
AGG_TIMESTAMP,
+        TRUE_BYTES));
+      results.add(PhoenixKeyValueUtil.newKeyValue(resultRowKey, 
CHUNK_METADATA_FAMILY,
+        BaseScannerRegionObserverConstants.SYNC_TABLE_HASH_QUALIFIER, 
AGG_TIMESTAMP, digestState));
+    } else {
+      // Complete chunk - finalize and return hash
+      byte[] hash = SHA256DigestUtil.finalizeDigestToChecksum(digest);
+      results.add(PhoenixKeyValueUtil.newKeyValue(resultRowKey, 
CHUNK_METADATA_FAMILY,
+        BaseScannerRegionObserverConstants.SYNC_TABLE_HASH_QUALIFIER, 
AGG_TIMESTAMP, hash));
+      results.add(PhoenixKeyValueUtil.newKeyValue(resultRowKey, 
CHUNK_METADATA_FAMILY,
+        
BaseScannerRegionObserverConstants.SYNC_TABLE_IS_PARTIAL_CHUNK_QUALIFIER, 
AGG_TIMESTAMP,
+        FALSE_BYTES));
+    }
+  }
+
+  /**
+   * Add dummy cell to the result list based on either the previous rowKey 
returned to the client or
+   * the start rowKey and start rowKey include params.
+   * @param result                 result to add the dummy cell to.
+   * @param initStartRowKey        scan start rowKey.
+   * @param includeInitStartRowKey scan start rowKey included.
+   * @param scan                   scan object.
+   */
+  private void updateDummyWithPrevRowKey(List<Cell> result, byte[] 
initStartRowKey,
+    boolean includeInitStartRowKey, Scan scan) {
+    result.clear();
+    if (previousResultRowKey != null) {
+      getDummyResult(previousResultRowKey, result);
+    } else {
+      if (includeInitStartRowKey && initStartRowKey.length > 0) {
+        byte[] prevKey;
+        // In order to generate largest possible rowkey that is less than
+        // initStartRowKey, we need to check size of the region name that can 
be
+        // used by hbase client for meta lookup, in case meta cache is expired 
at client.
+        // Once we know regionLookupInMetaLen, use it to generate largest 
possible
+        // rowkey that is lower than initStartRowKey by using
+        // ByteUtil#previousKeyWithLength function, which appends "\\xFF" 
bytes to
+        // prev rowKey up to the length provided. e.g. for the given key
+        // "\\x01\\xC1\\x06", the previous key with length 5 would be
+        // "\\x01\\xC1\\x05\\xFF\\xFF" by padding 2 bytes "\\xFF".
+        // The length of the largest scan start rowkey should not exceed
+        // HConstants#MAX_ROW_LENGTH.
+        int regionLookupInMetaLen =
+          
RegionInfo.createRegionName(region.getTableDescriptor().getTableName(), new 
byte[1],
+            HConstants.NINES, false).length;
+        if (
+          Bytes.compareTo(initStartRowKey, initStartRowKey.length - 1, 1, 
ByteUtil.ZERO_BYTE, 0, 1)
+              == 0
+        ) {
+          // If initStartRowKey has last byte as "\\x00", we can discard the 
last
+          // byte and send the key as dummy rowKey.
+          prevKey = new byte[initStartRowKey.length - 1];
+          System.arraycopy(initStartRowKey, 0, prevKey, 0, prevKey.length);
+        } else
+          if (initStartRowKey.length < (HConstants.MAX_ROW_LENGTH - 1 - 
regionLookupInMetaLen)) {
+            prevKey =
+              ByteUtil.previousKeyWithLength(
+                ByteUtil.concat(initStartRowKey,
+                  new byte[HConstants.MAX_ROW_LENGTH - initStartRowKey.length 
- 1
+                    - regionLookupInMetaLen]),
+                HConstants.MAX_ROW_LENGTH - 1 - regionLookupInMetaLen);
+          } else {
+            prevKey = initStartRowKey;
+          }
+        getDummyResult(prevKey, result);

Review Comment:
   Can you move the logic to derive `prevKey` as something like 
`ScanUtil.getEffectivePrevKeyForPaging` and add unit tests for each scenario?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to