haridsv commented on code in PR #2379: URL: https://github.com/apache/phoenix/pull/2379#discussion_r2989688144
########## phoenix-core-client/src/main/java/org/apache/phoenix/util/SHA256DigestUtil.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.util; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.hadoop.hbase.util.Bytes; +import org.bouncycastle.crypto.digests.SHA256Digest; + +/** + * Utility class for SHA-256 digest state serialization and deserialization. We are not using jdk + * bundled SHA, since their digest can't be serialized/deserialized which is needed for + * PhoenixSyncTableTool for cross-region hash continuation. + */ +public class SHA256DigestUtil { + + /** + * Maximum allowed size for encoded SHA-256 digest state. SHA-256 state is ~96 bytes, we allow up Review Comment: Can you point me to the documentation on the size being ~96 bytes? ########## phoenix-core-client/src/main/java/org/apache/phoenix/util/SHA256DigestUtil.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.util; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.hadoop.hbase.util.Bytes; +import org.bouncycastle.crypto.digests.SHA256Digest; + +/** + * Utility class for SHA-256 digest state serialization and deserialization. We are not using jdk + * bundled SHA, since their digest can't be serialized/deserialized which is needed for + * PhoenixSyncTableTool for cross-region hash continuation. + */ +public class SHA256DigestUtil { + + /** + * Maximum allowed size for encoded SHA-256 digest state. SHA-256 state is ~96 bytes, we allow up + * to 128 bytes as buffer. + */ + public static final int MAX_SHA256_DIGEST_STATE_SIZE = 128; + + /** + * Encodes a SHA256Digest state to a byte array with length prefix for validation. Format: [4-byte + * integer length][encoded digest state bytes] + * @param digest The digest whose state should be encoded + * @return Byte array containing integer length prefix + encoded state + */ + public static byte[] encodeDigestState(SHA256Digest digest) { + byte[] encoded = digest.getEncodedState(); + ByteBuffer buffer = ByteBuffer.allocate(Bytes.SIZEOF_INT + encoded.length); + buffer.putInt(encoded.length); + buffer.put(encoded); + return buffer.array(); + } Review Comment: Since MAX_SHA256_DIGEST_STATE_SIZE is capped at 128 bytes , using a 4-byte integer and ByteBuffer for the length prefix is slightly over-engineered. We can optimize this by using a single byte for the length and Bytes.add() for concatenation. This would allow us to remove the ByteBuffer, ByteArrayInputStream, and DataInputStream dependencies in these utility methods. ```suggestion public static byte[] encodeDigestState(SHA256Digest digest) { byte[] encoded = digest.getEncodedState(); // Use an unsigned byte as 128 > Byte.MAX_VALUe return Bytes.add(new byte[]{(byte) (encoded.length & 0xff)}, encoded); } ``` ########## phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRow.java: ########## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.Objects; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.Bytes; + +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; + +/** + * Data model class representing required row in the PHOENIX_SYNC_TABLE_CHECKPOINT table + */ +public class PhoenixSyncTableOutputRow { + + public enum Type { + CHUNK, + MAPPER_REGION + } + + public enum Status { + VERIFIED, + MISMATCHED + } + + private String tableName; + private String targetCluster; + private Type type; + private Long fromTime; + private Long toTime; + private Boolean isDryRun; + private byte[] startRowKey; + private byte[] endRowKey; + private Boolean isFirstRegion; Review Comment: Is this being used? ########## phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java: ########## @@ -199,4 +199,23 @@ public static long getMaxLookbackInMillis(Configuration conf) { /** Exposed for testing */ public static final String SCANNER_OPENED_TRACE_INFO = "Scanner opened on server"; + + /** + * PhoenixSyncTableTool scan attributes for server-side chunk formation and checksum + */ + public static final String SYNC_TABLE_CHUNK_FORMATION = "_SyncTableChunkFormation"; + public static final String SYNC_TABLE_CHUNK_SIZE_BYTES = "_SyncTableChunkSizeBytes"; + public static final String SYNC_TABLE_CONTINUED_DIGEST_STATE = "_SyncTableContinuedDigestState"; Review Comment: Can you add JavaDoc on all 3 constants individually with a description of what they the attribute is and what type of value it would contain? ########## phoenix-core-client/src/main/java/org/apache/phoenix/util/SHA256DigestUtil.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.util; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.hadoop.hbase.util.Bytes; +import org.bouncycastle.crypto.digests.SHA256Digest; + +/** + * Utility class for SHA-256 digest state serialization and deserialization. We are not using jdk + * bundled SHA, since their digest can't be serialized/deserialized which is needed for + * PhoenixSyncTableTool for cross-region hash continuation. + */ +public class SHA256DigestUtil { + + /** + * Maximum allowed size for encoded SHA-256 digest state. SHA-256 state is ~96 bytes, we allow up + * to 128 bytes as buffer. + */ + public static final int MAX_SHA256_DIGEST_STATE_SIZE = 128; + + /** + * Encodes a SHA256Digest state to a byte array with length prefix for validation. Format: [4-byte + * integer length][encoded digest state bytes] + * @param digest The digest whose state should be encoded + * @return Byte array containing integer length prefix + encoded state + */ + public static byte[] encodeDigestState(SHA256Digest digest) { + byte[] encoded = digest.getEncodedState(); + ByteBuffer buffer = ByteBuffer.allocate(Bytes.SIZEOF_INT + encoded.length); + buffer.putInt(encoded.length); + buffer.put(encoded); + return buffer.array(); + } Review Comment: BTW, can you tell me why we need to encode the length into it? You are using `PhoenixKeyValueUtil.newKeyValue` which is already encoding the length of the `byte[]` anyway. ########## phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableMapper.java: ########## @@ -0,0 +1,723 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES; + +import java.io.IOException; +import java.security.MessageDigest; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; +import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.util.ConnectionUtil; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; +import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.util.MetaDataUtil; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.SHA256DigestUtil; +import org.apache.phoenix.util.ScanUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; + +/** + * Mapper that acts as a driver for validating table data between source and target clusters. The + * actual work of chunking and hashing is done server-side by the coprocessor. This mapper fetches + * chunk hashes from both clusters, compares them and write to checkpoint table. + */ +public class PhoenixSyncTableMapper + extends Mapper<NullWritable, DBInputFormat.NullDBWritable, NullWritable, NullWritable> { + + private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixSyncTableMapper.class); + + public enum SyncCounters { + CHUNKS_VERIFIED, + CHUNKS_MISMATCHED, + SOURCE_ROWS_PROCESSED, + TARGET_ROWS_PROCESSED + } + + private String tableName; + private String targetZkQuorum; + private Long fromTime; + private Long toTime; + private boolean isDryRun; + private long chunkSizeBytes; + private Configuration conf; + private Connection sourceConnection; + private Connection targetConnection; + private Connection globalConnection; + private PTable pTable; + private byte[] physicalTableName; + private byte[] mapperRegionStart; + private byte[] mapperRegionEnd; + private PhoenixSyncTableOutputRepository syncTableOutputRepository; + private Timestamp mapperStartTime; + + @Override + protected void setup(Context context) throws InterruptedException { + try { + super.setup(context); + mapperStartTime = new Timestamp(System.currentTimeMillis()); + this.conf = context.getConfiguration(); + tableName = PhoenixConfigurationUtil.getPhoenixSyncTableName(conf); + targetZkQuorum = PhoenixConfigurationUtil.getPhoenixSyncTableTargetZkQuorum(conf); + fromTime = PhoenixConfigurationUtil.getPhoenixSyncTableFromTime(conf); + toTime = PhoenixConfigurationUtil.getPhoenixSyncTableToTime(conf); + isDryRun = PhoenixConfigurationUtil.getPhoenixSyncTableDryRun(conf); + chunkSizeBytes = PhoenixConfigurationUtil.getPhoenixSyncTableChunkSizeBytes(conf); + extractRegionBoundariesFromSplit(context); + sourceConnection = ConnectionUtil.getInputConnection(conf); + pTable = sourceConnection.unwrap(PhoenixConnection.class).getTable(tableName); + physicalTableName = pTable.getPhysicalName().getBytes(); + connectToTargetCluster(); + globalConnection = createGlobalConnection(conf); + syncTableOutputRepository = new PhoenixSyncTableOutputRepository(globalConnection); + } catch (SQLException | IOException e) { + tryClosingResources(); + throw new RuntimeException( + String.format("Failed to setup PhoenixSyncTableMapper for table: %s", tableName), e); + } + } + + /** + * Extracts mapper region boundaries from the PhoenixInputSplit + */ + private void extractRegionBoundariesFromSplit(Context context) { + PhoenixInputSplit split = (PhoenixInputSplit) context.getInputSplit(); + KeyRange keyRange = split.getKeyRange(); + if (keyRange == null) { + throw new IllegalStateException(String.format( + "PhoenixInputSplit has no KeyRange for table: %s . Cannot determine region boundaries for sync operation.", + tableName)); + } + mapperRegionStart = keyRange.getLowerRange(); + mapperRegionEnd = keyRange.getUpperRange(); + } + + /** + * Connects to the target cluster using the target ZK quorum, port, znode, krb principal + */ + private void connectToTargetCluster() throws SQLException, IOException { + Configuration targetConf = + PhoenixMapReduceUtil.createConfigurationForZkQuorum(conf, targetZkQuorum); + targetConnection = ConnectionUtil.getInputConnection(targetConf); + } + + /** + * Creates a global (non-tenant) connection for the checkpoint table. + */ + private Connection createGlobalConnection(Configuration conf) throws SQLException { + Configuration globalConf = new Configuration(conf); + globalConf.unset(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID); + globalConf.unset(PhoenixRuntime.CURRENT_SCN_ATTRIB); + return ConnectionUtil.getInputConnection(globalConf); + } + + /** + * Processes a mapper region by comparing chunks between source and target clusters. Gets already + * processed chunks from checkpoint table, resumes from check pointed progress and records final + * status for chunks & mapper (VERIFIED/MISMATCHED). + */ + @Override + protected void map(NullWritable key, DBInputFormat.NullDBWritable value, Context context) + throws IOException, InterruptedException { + context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1); + try { + List<PhoenixSyncTableOutputRow> processedChunks = + syncTableOutputRepository.getProcessedChunks(tableName, targetZkQuorum, fromTime, toTime, + mapperRegionStart, mapperRegionEnd); + List<Pair<byte[], byte[]>> unprocessedRanges = + calculateUnprocessedRanges(mapperRegionStart, mapperRegionEnd, processedChunks); + boolean isStartKeyInclusive = shouldStartKeyBeInclusive(mapperRegionStart, processedChunks); + for (Pair<byte[], byte[]> range : unprocessedRanges) { + processMapperRanges(range.getFirst(), range.getSecond(), isStartKeyInclusive, context); + isStartKeyInclusive = false; + } + + long mismatchedChunk = context.getCounter(SyncCounters.CHUNKS_MISMATCHED).getValue(); + long verifiedChunk = context.getCounter(SyncCounters.CHUNKS_VERIFIED).getValue(); + long sourceRowsProcessed = context.getCounter(SyncCounters.SOURCE_ROWS_PROCESSED).getValue(); + long targetRowsProcessed = context.getCounter(SyncCounters.TARGET_ROWS_PROCESSED).getValue(); + Timestamp mapperEndTime = new Timestamp(System.currentTimeMillis()); + String counters = formatMapperCounters(verifiedChunk, mismatchedChunk, sourceRowsProcessed, + targetRowsProcessed); + + if (sourceRowsProcessed > 0) { + if (mismatchedChunk == 0) { + context.getCounter(PhoenixJobCounters.OUTPUT_RECORDS).increment(1); + syncTableOutputRepository.checkpointSyncTableResult(tableName, targetZkQuorum, + PhoenixSyncTableOutputRow.Type.MAPPER_REGION, fromTime, toTime, isDryRun, + mapperRegionStart, mapperRegionEnd, PhoenixSyncTableOutputRow.Status.VERIFIED, + mapperStartTime, mapperEndTime, counters); + LOGGER.info( + "PhoenixSyncTable mapper completed with verified: {} verified chunks, {} mismatched chunks", + verifiedChunk, mismatchedChunk); + } else { + context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1); + LOGGER.warn( + "PhoenixSyncTable mapper completed with mismatch: {} verified chunks, {} mismatched chunks", + verifiedChunk, mismatchedChunk); + syncTableOutputRepository.checkpointSyncTableResult(tableName, targetZkQuorum, + PhoenixSyncTableOutputRow.Type.MAPPER_REGION, fromTime, toTime, isDryRun, + mapperRegionStart, mapperRegionEnd, PhoenixSyncTableOutputRow.Status.MISMATCHED, + mapperStartTime, mapperEndTime, counters); + } + } else { + LOGGER.info( + "No rows pending to process. All mapper region boundaries are covered for startKey:{}, endKey: {}", + mapperRegionStart, mapperRegionEnd); + } + } catch (SQLException e) { + tryClosingResources(); + throw new RuntimeException("Error processing PhoenixSyncTableMapper", e); + } + } + + /** + * Processes a chunk range by comparing source and target cluster data. Source chunking: Breaks + * data into size-based chunks within given mapper region boundary. Target chunking: Follows + * source chunk boundaries. Source chunk boundary might be split across multiple target region, if + * so corpoc signals for partial chunk with partial digest. Once entire Source chunk is covered by + * target scanner, we calculate resulting checksum from combined digest. + * @param rangeStart Range start key + * @param rangeEnd Range end key + * @param isSourceStartKeyInclusive Whether startKey be inclusive for source chunking + * @param context Mapper context for progress and counters + * @throws IOException if scan fails + * @throws SQLException if database operations fail + */ + private void processMapperRanges(byte[] rangeStart, byte[] rangeEnd, + boolean isSourceStartKeyInclusive, Context context) throws IOException, SQLException { + // To handle scenario of target having extra keys compared to source keys: + // For every source chunk, we track whether its first chunk of Region or whether its lastChunk + // of region + // For every source chunk, we issue scan on target with + // - FirstChunkOfRegion : target scan start boundary would be rangeStart + // - LastChunkOfRegion : target scan end boundary would be rangeEnd + // - notFirstChunkOfRegion: target scan start boundary would be previous source chunk endKey + // - notLastChunkOfRegion: target scan end boundary would be current source chunk endKey + // Lets understand with an example. + // Source region boundary is [c,n) and source chunk returns [c1,d] , here `c` key is not present + // in source + // It could be the case that target has `c` present, so we issue scan on target chunk with + // startKey as `c` and not `c1` i.e [c,d] + // Similarly, if two consecutive source chunk returns its boundary as [e,g] and [h,j] + // When target is scanning for [h,j], it would issue scan with (g,j] to ensure we cover any + // extra key which is not in source but present in target + // + // Now eventually when chunking will reach for last source chunk on this region boundary, we + // again pass rangeEnd(with Exclusive) as target chunk boundary. + // Lets say, for above region boundary example second last and last sourceChunk returns [j,k] + // and [l,m]. Target chunk would issue scan for last chunk (k,n) + boolean isLastChunkOfRegion = false; + // We only want target startKey to be inclusive if source startKey is inclusive as well + // Source start key won't be inclusive if start of region boundary is already processed as chunk + // and check pointed + // Refer to shouldStartKeyBeInclusive() method to understand more about when source start key + // would be exclusive + boolean isTargetStartKeyInclusive = isSourceStartKeyInclusive; + try (ChunkScannerContext sourceScanner = createChunkScanner(sourceConnection, rangeStart, + rangeEnd, null, isSourceStartKeyInclusive, false, false)) { + ChunkInfo previousSourceChunk = null; + ChunkInfo sourceChunk = sourceScanner.getNextChunk(); + while (sourceChunk != null) { + sourceChunk.executionStartTime = new Timestamp(System.currentTimeMillis()); + // Peek ahead to see if this is the last chunk + ChunkInfo nextSourceChunk = sourceScanner.getNextChunk(); + if (nextSourceChunk == null) { + isLastChunkOfRegion = true; + } + ChunkInfo targetChunk = getTargetChunkWithSourceBoundary(targetConnection, + previousSourceChunk == null ? rangeStart : previousSourceChunk.endKey, + isLastChunkOfRegion ? rangeEnd : sourceChunk.endKey, isTargetStartKeyInclusive, + !isLastChunkOfRegion); + context.getCounter(SyncCounters.SOURCE_ROWS_PROCESSED).increment(sourceChunk.rowCount); + context.getCounter(SyncCounters.TARGET_ROWS_PROCESSED).increment(targetChunk.rowCount); + boolean matched = MessageDigest.isEqual(sourceChunk.hash, targetChunk.hash); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "isSourceStartKeyInclusive: {}, isTargetStartKeyInclusive: {}," + + "isTargetEndKeyInclusive: {}, isFirstChunkOfRegion: {}, isLastChunkOfRegion: {}." + + "Chunk comparison source {}, {}. Key range passed to target chunk: {}, {}." + + "target chunk returned {}, {}: source={} rows, target={} rows, matched={}", + isSourceStartKeyInclusive, isTargetStartKeyInclusive, !isLastChunkOfRegion, + previousSourceChunk == null, isLastChunkOfRegion, + Bytes.toStringBinary(sourceChunk.startKey), Bytes.toStringBinary(sourceChunk.endKey), + Bytes.toStringBinary( + previousSourceChunk == null ? rangeStart : previousSourceChunk.endKey), + Bytes.toStringBinary(isLastChunkOfRegion ? rangeEnd : sourceChunk.endKey), + Bytes.toStringBinary(targetChunk.startKey), Bytes.toStringBinary(targetChunk.endKey), + sourceChunk.rowCount, targetChunk.rowCount, matched); + } + sourceChunk.executionEndTime = new Timestamp(System.currentTimeMillis()); + String counters = formatChunkCounters(sourceChunk.rowCount, targetChunk.rowCount); + if (matched) { + handleVerifiedChunk(sourceChunk, context, counters); + } else { + handleMismatchedChunk(sourceChunk, context, counters); + } + previousSourceChunk = sourceChunk; + sourceChunk = nextSourceChunk; + // After first chunk, our target chunk boundary would be previousSourceChunk.endKey, + // so start key should not be inclusive + isTargetStartKeyInclusive = false; + context.progress(); + } + } + LOGGER.info("Completed sync table processing of Mapper region boundary {}, {}", + Bytes.toStringBinary(rangeStart), Bytes.toStringBinary(rangeEnd)); + } + + /** + * Scans target across multiple regions and returns a single combined ChunkInfo. Handles partial + * chunks by passing digest state to next scanner via scan attributes, enabling cross-region + * digest continuation. Since we are scanning rows based on source chunk boundary, it could be + * distributed across multiple target regions. We keep on creating scanner across target region + * until entire source chunk boundary is processed or chunk is null + * @param conn Target connection + * @param startKey Source chunk start key + * @param endKey Source chunk end key + * @return Single ChunkInfo with final hash from all target regions + */ + private ChunkInfo getTargetChunkWithSourceBoundary(Connection conn, byte[] startKey, + byte[] endKey, boolean isTargetStartKeyInclusive, boolean isTargetEndKeyInclusive) + throws IOException, SQLException { + ChunkInfo combinedTargetChunk = new ChunkInfo(); + combinedTargetChunk.startKey = null; + combinedTargetChunk.endKey = null; + combinedTargetChunk.hash = null; + combinedTargetChunk.rowCount = 0; + byte[] currentStartKey = startKey; + byte[] continuedDigestState = null; + ChunkInfo chunk; + while (true) { + // Each iteration scans one target region. The coprocessor processes all rows in + // that region within the scan range. For target boundary, the chunk is always + // marked partial and the digest state is passed to the next + // scanner for cross-region hash continuation. + try (ChunkScannerContext scanner = createChunkScanner(conn, currentStartKey, endKey, + continuedDigestState, isTargetStartKeyInclusive, isTargetEndKeyInclusive, true)) { + chunk = scanner.getNextChunk(); + // chunk == null means no more rows in the target range. + // We must finalize the digest to produce a proper checksum for comparison. + if (chunk == null) { + if (continuedDigestState != null) { + combinedTargetChunk.hash = + SHA256DigestUtil.finalizeDigestToChecksum(continuedDigestState); + } + break; + } + if (combinedTargetChunk.startKey == null) { + combinedTargetChunk.startKey = chunk.startKey; + } + combinedTargetChunk.endKey = chunk.endKey; + combinedTargetChunk.rowCount += chunk.rowCount; + continuedDigestState = chunk.hash; + currentStartKey = chunk.endKey; + isTargetStartKeyInclusive = false; + } + } + return combinedTargetChunk; + } + + /** + * Creates a reusable scanner context for fetching chunks from a range. + * @param conn Connection to cluster (source or target) + * @param startKey Range start key (inclusive) + * @param endKey Range end key (exclusive) + * @param continuedDigestState If not null, coprocessor will continue hashing from this state (for + * cross-region continuation on target) + * @param isStartKeyInclusive Whether StartKey Inclusive + * @param isEndKeyInclusive Whether EndKey Inclusive + * @throws IOException scanner creation fails + * @throws SQLException hTable connection fails + */ + private ChunkScannerContext createChunkScanner(Connection conn, byte[] startKey, byte[] endKey, + byte[] continuedDigestState, boolean isStartKeyInclusive, boolean isEndKeyInclusive, + boolean isTargetScan) throws IOException, SQLException { + // Not using try-with-resources since ChunkScannerContext owns the table lifecycle + Table hTable = + conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(physicalTableName); + Scan scan = + createChunkScan(startKey, endKey, isStartKeyInclusive, isEndKeyInclusive, isTargetScan); + scan.setAttribute(BaseScannerRegionObserverConstants.SYNC_TABLE_CHUNK_FORMATION, TRUE_BYTES); + scan.setAttribute(BaseScannerRegionObserverConstants.SKIP_REGION_BOUNDARY_CHECK, TRUE_BYTES); + scan.setAttribute(BaseScannerRegionObserverConstants.UNGROUPED_AGG, TRUE_BYTES); + if (continuedDigestState != null && continuedDigestState.length > 0) { + scan.setAttribute(BaseScannerRegionObserverConstants.SYNC_TABLE_CONTINUED_DIGEST_STATE, + continuedDigestState); + } + + if (!isTargetScan) { + scan.setAttribute(BaseScannerRegionObserverConstants.SYNC_TABLE_CHUNK_SIZE_BYTES, + Bytes.toBytes(chunkSizeBytes)); + } + long syncTablePageTimeoutMs = (long) (conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, + QueryServicesOptions.DEFAULT_SYNC_TABLE_RPC_TIMEOUT) * 0.5); Review Comment: What is the basis for this multiplier? Should it be a configurable value? ########## phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java: ########## @@ -1207,6 +1207,10 @@ public static boolean isIndexRebuild(Scan scan) { return scan.getAttribute((BaseScannerRegionObserverConstants.REBUILD_INDEXES)) != null; } + public static boolean isSyncTableChunkFormation(Scan scan) { + return scan.getAttribute(BaseScannerRegionObserverConstants.SYNC_TABLE_CHUNK_FORMATION) != null; Review Comment: I see you are explicitly setting to `TRUE_BYTES` but here you are just checking for existence, not a very good practice as it will break if someone sets it to `FALSE_BYTES` so better to be defensive and check the value. ########## phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java: ########## @@ -1207,6 +1207,10 @@ public static boolean isIndexRebuild(Scan scan) { return scan.getAttribute((BaseScannerRegionObserverConstants.REBUILD_INDEXES)) != null; } + public static boolean isSyncTableChunkFormation(Scan scan) { Review Comment: Do you mean isSyncTableChunkFormationEnabled? ########## phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableMapper.java: ########## @@ -0,0 +1,723 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES; + +import java.io.IOException; +import java.security.MessageDigest; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; +import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.util.ConnectionUtil; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; +import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.util.MetaDataUtil; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.SHA256DigestUtil; +import org.apache.phoenix.util.ScanUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; + +/** + * Mapper that acts as a driver for validating table data between source and target clusters. The + * actual work of chunking and hashing is done server-side by the coprocessor. This mapper fetches + * chunk hashes from both clusters, compares them and write to checkpoint table. + */ +public class PhoenixSyncTableMapper + extends Mapper<NullWritable, DBInputFormat.NullDBWritable, NullWritable, NullWritable> { + + private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixSyncTableMapper.class); + + public enum SyncCounters { + CHUNKS_VERIFIED, + CHUNKS_MISMATCHED, + SOURCE_ROWS_PROCESSED, + TARGET_ROWS_PROCESSED + } + + private String tableName; + private String targetZkQuorum; + private Long fromTime; + private Long toTime; + private boolean isDryRun; + private long chunkSizeBytes; + private Configuration conf; + private Connection sourceConnection; + private Connection targetConnection; + private Connection globalConnection; + private PTable pTable; + private byte[] physicalTableName; + private byte[] mapperRegionStart; + private byte[] mapperRegionEnd; + private PhoenixSyncTableOutputRepository syncTableOutputRepository; + private Timestamp mapperStartTime; + + @Override + protected void setup(Context context) throws InterruptedException { + try { + super.setup(context); + mapperStartTime = new Timestamp(System.currentTimeMillis()); + this.conf = context.getConfiguration(); + tableName = PhoenixConfigurationUtil.getPhoenixSyncTableName(conf); + targetZkQuorum = PhoenixConfigurationUtil.getPhoenixSyncTableTargetZkQuorum(conf); + fromTime = PhoenixConfigurationUtil.getPhoenixSyncTableFromTime(conf); + toTime = PhoenixConfigurationUtil.getPhoenixSyncTableToTime(conf); + isDryRun = PhoenixConfigurationUtil.getPhoenixSyncTableDryRun(conf); + chunkSizeBytes = PhoenixConfigurationUtil.getPhoenixSyncTableChunkSizeBytes(conf); + extractRegionBoundariesFromSplit(context); + sourceConnection = ConnectionUtil.getInputConnection(conf); + pTable = sourceConnection.unwrap(PhoenixConnection.class).getTable(tableName); + physicalTableName = pTable.getPhysicalName().getBytes(); + connectToTargetCluster(); + globalConnection = createGlobalConnection(conf); + syncTableOutputRepository = new PhoenixSyncTableOutputRepository(globalConnection); + } catch (SQLException | IOException e) { Review Comment: I would include RuntimeException too, to be more aggressive in avoiding a resource leak. ########## phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRow.java: ########## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.Objects; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.Bytes; + +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; + +/** + * Data model class representing required row in the PHOENIX_SYNC_TABLE_CHECKPOINT table + */ +public class PhoenixSyncTableOutputRow { + + public enum Type { + CHUNK, + MAPPER_REGION + } + + public enum Status { + VERIFIED, + MISMATCHED + } + + private String tableName; + private String targetCluster; + private Type type; + private Long fromTime; + private Long toTime; + private Boolean isDryRun; + private byte[] startRowKey; + private byte[] endRowKey; + private Boolean isFirstRegion; + private Timestamp executionStartTime; + private Timestamp executionEndTime; + private Status status; + private String counters; + + @Override + public String toString() { + return String.format("SyncOutputRow[table=%s, target=%s, type=%s, start=%s, end=%s, status=%s]", + tableName, targetCluster, type, Bytes.toStringBinary(startRowKey), + Bytes.toStringBinary(endRowKey), status); + } + + @Override + @VisibleForTesting + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PhoenixSyncTableOutputRow that = (PhoenixSyncTableOutputRow) o; + return Objects.equals(tableName, that.tableName) + && Objects.equals(targetCluster, that.targetCluster) && type == that.type + && Objects.equals(fromTime, that.fromTime) && Objects.equals(toTime, that.toTime) + && Objects.equals(isDryRun, that.isDryRun) && Arrays.equals(startRowKey, that.startRowKey) + && Arrays.equals(endRowKey, that.endRowKey) + && Objects.equals(isFirstRegion, that.isFirstRegion) + && Objects.equals(executionStartTime, that.executionStartTime) + && Objects.equals(executionEndTime, that.executionEndTime) && status == that.status + && Objects.equals(counters, that.counters); + } + + @Override + public int hashCode() { + int result = Objects.hash(tableName, targetCluster, type, fromTime, toTime, isDryRun, + isFirstRegion, executionStartTime, executionEndTime, status, counters); + result = 31 * result + Arrays.hashCode(startRowKey); + result = 31 * result + Arrays.hashCode(endRowKey); + return result; + } + + @VisibleForTesting + public String getTableName() { + return tableName; + } + + @VisibleForTesting + public String getTargetCluster() { + return targetCluster; + } + + @VisibleForTesting + public Type getType() { + return type; + } + + @VisibleForTesting + public Long getFromTime() { + return fromTime; + } + + @VisibleForTesting + public Long getToTime() { + return toTime; + } + + public byte[] getStartRowKey() { + return startRowKey != null ? Arrays.copyOf(startRowKey, startRowKey.length) : null; + } + + public byte[] getEndRowKey() { + return endRowKey != null ? Arrays.copyOf(endRowKey, endRowKey.length) : null; + } + + @VisibleForTesting + public Timestamp getExecutionStartTime() { + return executionStartTime; + } + + @VisibleForTesting + public Timestamp getExecutionEndTime() { + return executionEndTime; + } + + @VisibleForTesting + public Status getStatus() { + return status; + } + + @VisibleForTesting + public String getCounters() { + return counters; + } + + @VisibleForTesting + public long getSourceRowsProcessed() { + return parseCounterValue(PhoenixSyncTableMapper.SyncCounters.SOURCE_ROWS_PROCESSED.name()); + } + + @VisibleForTesting + public long getTargetRowsProcessed() { + return parseCounterValue(PhoenixSyncTableMapper.SyncCounters.TARGET_ROWS_PROCESSED.name()); + } + + @VisibleForTesting Review Comment: On a private function? ########## phoenix-core-client/src/main/java/org/apache/phoenix/util/SHA256DigestUtil.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.util; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.hadoop.hbase.util.Bytes; +import org.bouncycastle.crypto.digests.SHA256Digest; + +/** + * Utility class for SHA-256 digest state serialization and deserialization. We are not using jdk + * bundled SHA, since their digest can't be serialized/deserialized which is needed for + * PhoenixSyncTableTool for cross-region hash continuation. + */ +public class SHA256DigestUtil { + + /** + * Maximum allowed size for encoded SHA-256 digest state. SHA-256 state is ~96 bytes, we allow up Review Comment: I didn't verify this, but DeepWiki says it can be up to 309 bytes: https://deepwiki.com/search/is-there-an-upper-limit-to-the_7872e61f-4f3f-462e-b4e9-cb6cbed47bd8?mode=fast ########## phoenix-core-client/src/main/java/org/apache/phoenix/util/SHA256DigestUtil.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.util; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.hadoop.hbase.util.Bytes; +import org.bouncycastle.crypto.digests.SHA256Digest; + +/** + * Utility class for SHA-256 digest state serialization and deserialization. We are not using jdk + * bundled SHA, since their digest can't be serialized/deserialized which is needed for + * PhoenixSyncTableTool for cross-region hash continuation. + */ +public class SHA256DigestUtil { + + /** + * Maximum allowed size for encoded SHA-256 digest state. SHA-256 state is ~96 bytes, we allow up + * to 128 bytes as buffer. + */ + public static final int MAX_SHA256_DIGEST_STATE_SIZE = 128; + + /** + * Encodes a SHA256Digest state to a byte array with length prefix for validation. Format: [4-byte + * integer length][encoded digest state bytes] + * @param digest The digest whose state should be encoded + * @return Byte array containing integer length prefix + encoded state + */ + public static byte[] encodeDigestState(SHA256Digest digest) { + byte[] encoded = digest.getEncodedState(); + ByteBuffer buffer = ByteBuffer.allocate(Bytes.SIZEOF_INT + encoded.length); + buffer.putInt(encoded.length); + buffer.put(encoded); + return buffer.array(); + } + + /** + * Decodes a SHA256Digest state from a byte array. + * @param encodedState Byte array containing 4-byte integer length prefix + encoded state + * @return SHA256Digest restored to the saved state + * @throws IOException if state is invalid, corrupted + */ + public static SHA256Digest decodeDigestState(byte[] encodedState) throws IOException { + if (encodedState == null) { + throw new IllegalArgumentException("Invalid encoded digest state: encodedState is null"); + } + + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(encodedState)); + int stateLength = dis.readInt(); + // Prevent malicious large allocations + if (stateLength > MAX_SHA256_DIGEST_STATE_SIZE) { + throw new IllegalArgumentException( + String.format("Invalid SHA256 state length: %d, expected <= %d", stateLength, + MAX_SHA256_DIGEST_STATE_SIZE)); + } + + byte[] state = new byte[stateLength]; + dis.readFully(state); Review Comment: Following my suggestion in encode, this will simply become: ```suggestion int stateLength = encodedState[0] & 0xff; // Prevent malicious large allocations if (stateLength > MAX_SHA256_DIGEST_STATE_SIZE) { throw new IllegalArgumentException( String.format("Invalid SHA256 state length: %d, expected <= %d", stateLength, MAX_SHA256_DIGEST_STATE_SIZE)); } byte[] state = new byte[stateLength]; System.arraycopy(encodedState, 1, state, 0, stateLength); ``` ########## phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java: ########## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import org.apache.phoenix.mapreduce.PhoenixSyncTableOutputRow.Status; +import org.apache.phoenix.mapreduce.PhoenixSyncTableOutputRow.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Repository for managing the PHOENIX_SYNC_TABLE_CHECKPOINT table. This table stores checkpoint + * information for the PhoenixSyncTableTool, enabling: 1. Mapper Level checkpointing (skip completed + * mapper regions on restart) 2. Chunk level checkpointing (skip completed chunks) + */ +public class PhoenixSyncTableOutputRepository { + + private static final Logger LOGGER = + LoggerFactory.getLogger(PhoenixSyncTableOutputRepository.class); + public static final String SYNC_TABLE_CHECKPOINT_TABLE_NAME = "PHOENIX_SYNC_TABLE_CHECKPOINT"; + private static final int OUTPUT_TABLE_TTL_SECONDS = 90 * 24 * 60 * 60; // 90 days + private final Connection connection; + private static final String UPSERT_CHECKPOINT_SQL = "UPSERT INTO " + + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (TABLE_NAME, TARGET_CLUSTER, TYPE, FROM_TIME, TO_TIME," + + " START_ROW_KEY, END_ROW_KEY, IS_DRY_RUN, EXECUTION_START_TIME, EXECUTION_END_TIME," + + " STATUS, COUNTERS) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + /** + * Creates a repository for managing sync table checkpoint operations. Note: The connection is + * stored as-is and shared across operations. The caller retains ownership and is responsible for + * connection lifecycle. + * @param connection Phoenix connection (must remain open for repository lifetime) + */ + public PhoenixSyncTableOutputRepository(Connection connection) { + this.connection = connection; + } + + public void createSyncCheckpointTableIfNotExists() throws SQLException { + String ddl = "CREATE TABLE IF NOT EXISTS " + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (\n" + + " TABLE_NAME VARCHAR NOT NULL,\n" + " TARGET_CLUSTER VARCHAR NOT NULL,\n" + + " TYPE VARCHAR(20) NOT NULL,\n" + " FROM_TIME BIGINT NOT NULL,\n" + + " TO_TIME BIGINT NOT NULL,\n" + " START_ROW_KEY VARBINARY_ENCODED,\n" + + " END_ROW_KEY VARBINARY_ENCODED,\n" + " IS_DRY_RUN BOOLEAN, \n" + + " EXECUTION_START_TIME TIMESTAMP,\n" + " EXECUTION_END_TIME TIMESTAMP,\n" + + " STATUS VARCHAR(20),\n" + " COUNTERS VARCHAR(255), \n" + + " CONSTRAINT PK PRIMARY KEY (\n" + " TABLE_NAME,\n" + " TARGET_CLUSTER,\n" + + " TYPE ,\n" + " FROM_TIME,\n" + " TO_TIME,\n" + + " START_ROW_KEY )" + ") TTL=" + OUTPUT_TABLE_TTL_SECONDS; + + try (Statement stmt = connection.createStatement()) { + stmt.execute(ddl); + connection.commit(); + LOGGER.info("Successfully created or verified existence of {} table", + SYNC_TABLE_CHECKPOINT_TABLE_NAME); + } + } + + public void checkpointSyncTableResult(String tableName, String targetCluster, Type type, + Long fromTime, Long toTime, boolean isDryRun, byte[] startKey, byte[] endKey, Status status, + Timestamp executionStartTime, Timestamp executionEndTime, String counters) throws SQLException { + + // Validate required parameters + if (tableName == null || tableName.isEmpty()) { + throw new IllegalArgumentException("TableName cannot be null or empty for checkpoint"); + } + if (targetCluster == null || targetCluster.isEmpty()) { + throw new IllegalArgumentException("TargetCluster cannot be null or empty for checkpoint"); + } + if (type == null) { + throw new IllegalArgumentException("Type cannot be null for checkpoint"); + } + if (fromTime == null || toTime == null) { + throw new IllegalArgumentException("FromTime and ToTime cannot be null for checkpoint"); + } + + try (PreparedStatement ps = connection.prepareStatement(UPSERT_CHECKPOINT_SQL)) { + ps.setString(1, tableName); + ps.setString(2, targetCluster); + ps.setString(3, type.name()); + ps.setLong(4, fromTime); + ps.setLong(5, toTime); + ps.setBytes(6, startKey); + ps.setBytes(7, endKey); + ps.setBoolean(8, isDryRun); + ps.setTimestamp(9, executionStartTime); + ps.setTimestamp(10, executionEndTime); + ps.setString(11, status != null ? status.name() : null); + ps.setString(12, counters); + ps.executeUpdate(); + connection.commit(); + } + } + + /** + * Queries for completed mapper regions. Used by PhoenixSyncTableInputFormat to filter out + * already-processed regions. + * @param tableName Source table name + * @param targetCluster Target cluster ZK quorum + * @param fromTime Start timestamp (nullable) + * @param toTime End timestamp (nullable) + * @return List of completed mapper regions + */ + public List<PhoenixSyncTableOutputRow> getProcessedMapperRegions(String tableName, + String targetCluster, Long fromTime, Long toTime) throws SQLException { + + String query = "SELECT START_ROW_KEY, END_ROW_KEY FROM " + SYNC_TABLE_CHECKPOINT_TABLE_NAME + + " WHERE TABLE_NAME = ? AND TARGET_CLUSTER = ?" + + " AND TYPE = ? AND FROM_TIME = ? AND TO_TIME = ? AND STATUS IN ( ?, ?)"; + List<PhoenixSyncTableOutputRow> results = new ArrayList<>(); + try (PreparedStatement ps = connection.prepareStatement(query)) { + int paramIndex = 1; + ps.setString(paramIndex++, tableName); + ps.setString(paramIndex++, targetCluster); + ps.setString(paramIndex++, Type.MAPPER_REGION.name()); + ps.setLong(paramIndex++, fromTime); + ps.setLong(paramIndex++, toTime); + ps.setString(paramIndex++, Status.VERIFIED.name()); + ps.setString(paramIndex, Status.MISMATCHED.name()); + try (ResultSet rs = ps.executeQuery()) { + while (rs.next()) { + PhoenixSyncTableOutputRow row = + new PhoenixSyncTableOutputRow.Builder().setStartRowKey(rs.getBytes("START_ROW_KEY")) + .setEndRowKey(rs.getBytes("END_ROW_KEY")).build(); + results.add(row); + } + } + } + return results; + } + + /** + * Queries for processed chunks. Used by PhoenixSyncTableMapper to skip already-processed chunks. + * @param tableName Source table name + * @param targetCluster Target cluster ZK quorum + * @param fromTime Start timestamp (nullable) + * @param toTime End timestamp (nullable) + * @param mapperRegionStart Mapper region start key + * @param mapperRegionEnd Mapper region end key + * @return List of processed chunks in the region + */ + public List<PhoenixSyncTableOutputRow> getProcessedChunks(String tableName, String targetCluster, + Long fromTime, Long toTime, byte[] mapperRegionStart, byte[] mapperRegionEnd) + throws SQLException { + StringBuilder queryBuilder = new StringBuilder(); + queryBuilder.append("SELECT START_ROW_KEY, END_ROW_KEY FROM " + SYNC_TABLE_CHECKPOINT_TABLE_NAME + + " WHERE TABLE_NAME = ? AND TARGET_CLUSTER = ? " + + " AND TYPE = ? AND FROM_TIME = ? AND TO_TIME = ?"); + + // Check if mapper region boundaries are non-empty (i.e., NOT first/last regions) + // Only add boundary conditions for non-empty boundaries + boolean hasEndBoundary = mapperRegionEnd != null && mapperRegionEnd.length > 0; + boolean hasStartBoundary = mapperRegionStart != null && mapperRegionStart.length > 0; + + // Filter chunks that overlap with this mapper region: + // - Chunk overlaps if: chunkStart < mapperRegionEnd (when end boundary exists) + // - Chunk overlaps if: chunkEnd > mapperRegionStart (when start boundary exists) + if (hasEndBoundary) { + queryBuilder.append(" AND START_ROW_KEY <= ?"); + } + if (hasStartBoundary) { + queryBuilder.append(" AND END_ROW_KEY >= ?"); Review Comment: For last region, there will be no constraint on START_ROW_KEY and END_ROW_KEY is not part of the PK, so this can perform poorly, I think. ########## phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixSyncTableRegionScanner.java: ########## @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.DEFAULT_PHOENIX_SYNC_TABLE_CHUNK_SIZE_BYTES; +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; +import static org.apache.phoenix.schema.types.PDataType.FALSE_BYTES; +import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES; +import static org.apache.phoenix.util.ScanUtil.getDummyResult; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; +import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.PhoenixKeyValueUtil; +import org.apache.phoenix.util.SHA256DigestUtil; +import org.apache.phoenix.util.ScanUtil; +import org.bouncycastle.crypto.digests.SHA256Digest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; + +/** + * Server-side coprocessor that performs chunk formation and SHA-256 hashing for + * PhoenixSyncTableTool. + * <p> + * Accumulates rows into chunks (based on size limits) and computes a hash of all row data (keys, + * column families, qualifiers, timestamps, cell types, values). In case of paging timeout, return + * whatever is accumulated in chunk. If nothing is accumulated return dummy row either with prev + * result rowKey or max possible key < currentRowKey + * <p> + * Source scan (isTargetScan=false): Returns complete chunks(if paging dint timeout) bounded by + * region boundaries. Sets hasMoreRows=false when region is exhausted. + * <p> + * Target scan (isTargetScan=true): Returns partial chunks with serialized digest state when region + * boundary is reached, allowing cross-region hash continuation. + * <p> + * Returns chunk metadata cells: START_KEY, END_KEY, HASH (or digest state), ROW_COUNT, + * IS_PARTIAL_CHUNK + */ +public class PhoenixSyncTableRegionScanner extends BaseRegionScanner { + + private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixSyncTableRegionScanner.class); + private static final byte[] CHUNK_METADATA_FAMILY = SINGLE_COLUMN_FAMILY; + private final Region region; + private final Scan scan; + private final RegionCoprocessorEnvironment env; + private final UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver; + private final long chunkSizeBytes; + private boolean isTargetScan = false; + private byte[] chunkStartKey = null; + private byte[] chunkEndKey = null; + private long currentChunkSize = 0L; + private long currentChunkRowCount = 0L; + private final SHA256Digest digest; + private boolean hasMoreRows = true; + private boolean isUsingContinuedDigest; + private byte[] previousResultRowKey = null; + private final byte[] initStartRowKey; + private final boolean includeInitStartRowKey; + private final long pageSizeMs; + + /** + * Creates a PhoenixSyncTableRegionScanner for chunk-based hashing. + * @param innerScanner The underlying region scanner + * @param region The region being scanned + * @param scan The scan request + * @param env The coprocessor environment + * @param ungroupedAggregateRegionObserver Parent observer for region state checks + * @throws IllegalStateException if digest state restoration fails + */ + @VisibleForTesting + public PhoenixSyncTableRegionScanner(final RegionScanner innerScanner, final Region region, + final Scan scan, final RegionCoprocessorEnvironment env, + final UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver, long pageSizeMs) { + super(innerScanner); + this.region = region; + this.scan = scan; + this.env = env; + this.ungroupedAggregateRegionObserver = ungroupedAggregateRegionObserver; + byte[] chunkSizeAttr = + scan.getAttribute(BaseScannerRegionObserverConstants.SYNC_TABLE_CHUNK_SIZE_BYTES); + if (chunkSizeAttr == null) { // Since we don't set chunk size scan attr for target cluster scan + this.isTargetScan = true; + } + this.chunkSizeBytes = chunkSizeAttr != null + ? Bytes.toLong(chunkSizeAttr) + : DEFAULT_PHOENIX_SYNC_TABLE_CHUNK_SIZE_BYTES; + + // Check if we should continue from a previous digest state (cross-region continuation) + byte[] continuedDigestStateAttr = + scan.getAttribute(BaseScannerRegionObserverConstants.SYNC_TABLE_CONTINUED_DIGEST_STATE); + if (continuedDigestStateAttr != null) { + try { + this.digest = SHA256DigestUtil.decodeDigestState(continuedDigestStateAttr); + this.isUsingContinuedDigest = true; + } catch (IOException e) { + throw new IllegalStateException("Failed to restore continued digest state", e); + } + } else { + this.digest = new SHA256Digest(); + this.isUsingContinuedDigest = false; + } + this.initStartRowKey = scan.getStartRow(); + this.includeInitStartRowKey = scan.includeStartRow(); + this.pageSizeMs = pageSizeMs; + } + + @Override + public boolean next(List<Cell> results) throws IOException { + return next(results, null); + } + + /** + * Accumulates rows into a chunk and returns chunk metadata cells. Supports server-side paging via + * {@link PhoenixScannerContext} following the same pattern as + * {@link GroupedAggregateRegionObserver} and {@link UncoveredIndexRegionScanner}. + * @param results Output list to populate with chunk metadata cells + * @param scannerContext Phoenix scanner context for paging timeout detection + * @return true if more chunks available, false if scanning complete + */ + @Override + public boolean next(List<Cell> results, ScannerContext scannerContext) throws IOException { + region.startRegionOperation(); + try { + resetChunkState(); + RegionScanner localScanner = delegate; + synchronized (localScanner) { + List<Cell> rowCells = new ArrayList<>(); + while (hasMoreRows) { + ungroupedAggregateRegionObserver.checkForRegionClosingOrSplitting(); + rowCells.clear(); + hasMoreRows = (scannerContext == null) + ? localScanner.nextRaw(rowCells) + : localScanner.nextRaw(rowCells, scannerContext); + + if (!rowCells.isEmpty() && ScanUtil.isDummy(rowCells)) { + if (chunkStartKey == null) { + LOGGER.warn("Paging timed out while fetching first row of chunk, initStartRowKey: {}", + Bytes.toStringBinary(initStartRowKey)); + updateDummyWithPrevRowKey(results, initStartRowKey, includeInitStartRowKey, scan); + return true; + } else { + break; + } + } + + if (rowCells.isEmpty()) { + break; + } + + byte[] rowKey = CellUtil.cloneRow(rowCells.get(0)); + long rowSize = calculateRowSize(rowCells); + addRowToChunk(rowKey, rowCells, rowSize); + if (!isTargetScan && willExceedChunkLimits(rowSize)) { + break; + } + if ( + hasMoreRows && (PhoenixScannerContext.isReturnImmediately(scannerContext) + || PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)) + ) { + LOGGER.info("Paging timeout after {} rows ({} bytes) in region {}, chunk [{}:{}]", + currentChunkRowCount, currentChunkSize, + region.getRegionInfo().getRegionNameAsString(), Bytes.toStringBinary(chunkStartKey), + Bytes.toStringBinary(chunkEndKey)); + PhoenixScannerContext.setReturnImmediately(scannerContext); + break; + } + } + } + if (chunkStartKey == null) { + return false; + } + + buildChunkMetadataResult(results, isTargetScan); + previousResultRowKey = chunkEndKey; + return hasMoreRows; + } catch (Throwable t) { + LOGGER.error( + "Exception during chunk scanning in region {} table {} at chunk startKey: {}, endKey: {})", + region.getRegionInfo().getRegionNameAsString(), + region.getRegionInfo().getTable().getNameAsString(), + chunkStartKey != null ? Bytes.toStringBinary(chunkStartKey) : "null", + chunkEndKey != null ? Bytes.toStringBinary(chunkEndKey) : "null", t); + throw t; + } finally { + region.closeRegionOperation(); + } + } + + /** + * Resets chunk state for a new chunk. Note: If this scanner was initialized with continued digest + * state, the first call to this method will NOT reset the digest, allowing us to continue hashing + * from the previous region's state. + */ + private void resetChunkState() { + chunkStartKey = null; + chunkEndKey = null; + currentChunkSize = 0; + currentChunkRowCount = 0; + if (!isUsingContinuedDigest) { + digest.reset(); + } + isUsingContinuedDigest = false; + } + + private long calculateRowSize(List<Cell> cells) { + long size = 0; + for (Cell cell : cells) { + size += PrivateCellUtil.estimatedSerializedSizeOf(cell); + } + return size; + } + + private boolean willExceedChunkLimits(long rowSize) { + return currentChunkSize + rowSize > chunkSizeBytes; + } + + /** + * Adds a row to the current chunk and updates digest + */ + private void addRowToChunk(byte[] rowKey, List<Cell> cells, long rowSize) { + // Set chunk start key on first row + if (chunkStartKey == null) { + chunkStartKey = rowKey; + } + chunkEndKey = rowKey; + currentChunkSize += rowSize; + currentChunkRowCount++; + updateDigestWithRow(rowKey, cells); + } + + /** + * Updates the SHA-256 digest with data from a row. Hash includes: row key + cell family + cell + * qualifier + cell timestamp + cell type + cell value. This ensures that any difference in the + * data will result in different hashes. + */ + private void updateDigestWithRow(byte[] rowKey, List<Cell> cells) { + digest.update(rowKey, 0, rowKey.length); + byte[] timestampBuffer = new byte[8]; + for (Cell cell : cells) { + digest.update(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()); + digest.update(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + long ts = cell.getTimestamp(); + Bytes.putLong(timestampBuffer, 0, ts); + digest.update(timestampBuffer, 0, 8); + digest.update(cell.getType().getCode()); + digest.update(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + } + } + + /** + * Builds chunk metadata result cells and adds them to the results list. Returns a single + * "row"[rowKey=chunkEndKey] with multiple cells containing chunk metadata[chunkStartKey, + * hash/digest, rowCount, isPartialChunk]. For complete chunks: includes final SHA-256 hash (32 + * bytes) For partial chunks: includes serialized MessageDigest state for continuation + * @param results Output list to populate with chunk metadata cells + * @param isPartialChunk true if this is a partial chunk (region boundary reached before + * completion) + */ + private void buildChunkMetadataResult(List<Cell> results, boolean isPartialChunk) + throws IOException { + byte[] resultRowKey = this.chunkEndKey; + results.add(PhoenixKeyValueUtil.newKeyValue(resultRowKey, CHUNK_METADATA_FAMILY, + BaseScannerRegionObserverConstants.SYNC_TABLE_START_KEY_QUALIFIER, AGG_TIMESTAMP, + chunkStartKey)); + results.add(PhoenixKeyValueUtil.newKeyValue(resultRowKey, CHUNK_METADATA_FAMILY, + BaseScannerRegionObserverConstants.SYNC_TABLE_ROW_COUNT_QUALIFIER, AGG_TIMESTAMP, + Bytes.toBytes(currentChunkRowCount))); + if (isPartialChunk) { + // Partial chunk digest + byte[] digestState = SHA256DigestUtil.encodeDigestState(digest); + results.add(PhoenixKeyValueUtil.newKeyValue(resultRowKey, CHUNK_METADATA_FAMILY, + BaseScannerRegionObserverConstants.SYNC_TABLE_IS_PARTIAL_CHUNK_QUALIFIER, AGG_TIMESTAMP, + TRUE_BYTES)); + results.add(PhoenixKeyValueUtil.newKeyValue(resultRowKey, CHUNK_METADATA_FAMILY, + BaseScannerRegionObserverConstants.SYNC_TABLE_HASH_QUALIFIER, AGG_TIMESTAMP, digestState)); + } else { + // Complete chunk - finalize and return hash + byte[] hash = SHA256DigestUtil.finalizeDigestToChecksum(digest); + results.add(PhoenixKeyValueUtil.newKeyValue(resultRowKey, CHUNK_METADATA_FAMILY, + BaseScannerRegionObserverConstants.SYNC_TABLE_HASH_QUALIFIER, AGG_TIMESTAMP, hash)); + results.add(PhoenixKeyValueUtil.newKeyValue(resultRowKey, CHUNK_METADATA_FAMILY, + BaseScannerRegionObserverConstants.SYNC_TABLE_IS_PARTIAL_CHUNK_QUALIFIER, AGG_TIMESTAMP, + FALSE_BYTES)); + } + } + + /** + * Add dummy cell to the result list based on either the previous rowKey returned to the client or + * the start rowKey and start rowKey include params. + * @param result result to add the dummy cell to. + * @param initStartRowKey scan start rowKey. + * @param includeInitStartRowKey scan start rowKey included. + * @param scan scan object. + */ + private void updateDummyWithPrevRowKey(List<Cell> result, byte[] initStartRowKey, + boolean includeInitStartRowKey, Scan scan) { + result.clear(); + if (previousResultRowKey != null) { + getDummyResult(previousResultRowKey, result); + } else { + if (includeInitStartRowKey && initStartRowKey.length > 0) { + byte[] prevKey; + // In order to generate largest possible rowkey that is less than + // initStartRowKey, we need to check size of the region name that can be + // used by hbase client for meta lookup, in case meta cache is expired at client. + // Once we know regionLookupInMetaLen, use it to generate largest possible + // rowkey that is lower than initStartRowKey by using + // ByteUtil#previousKeyWithLength function, which appends "\\xFF" bytes to + // prev rowKey up to the length provided. e.g. for the given key + // "\\x01\\xC1\\x06", the previous key with length 5 would be + // "\\x01\\xC1\\x05\\xFF\\xFF" by padding 2 bytes "\\xFF". + // The length of the largest scan start rowkey should not exceed + // HConstants#MAX_ROW_LENGTH. + int regionLookupInMetaLen = + RegionInfo.createRegionName(region.getTableDescriptor().getTableName(), new byte[1], + HConstants.NINES, false).length; + if ( + Bytes.compareTo(initStartRowKey, initStartRowKey.length - 1, 1, ByteUtil.ZERO_BYTE, 0, 1) + == 0 + ) { + // If initStartRowKey has last byte as "\\x00", we can discard the last + // byte and send the key as dummy rowKey. + prevKey = new byte[initStartRowKey.length - 1]; + System.arraycopy(initStartRowKey, 0, prevKey, 0, prevKey.length); + } else + if (initStartRowKey.length < (HConstants.MAX_ROW_LENGTH - 1 - regionLookupInMetaLen)) { + prevKey = + ByteUtil.previousKeyWithLength( + ByteUtil.concat(initStartRowKey, + new byte[HConstants.MAX_ROW_LENGTH - initStartRowKey.length - 1 + - regionLookupInMetaLen]), + HConstants.MAX_ROW_LENGTH - 1 - regionLookupInMetaLen); + } else { + prevKey = initStartRowKey; + } + getDummyResult(prevKey, result); Review Comment: Can you move the logic to derive `prevKey` as something like `ScanUtil.getEffectivePrevKeyForPaging` and add unit tests for each scenario? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
