kadirozde commented on code in PR #2133: URL: https://github.com/apache/phoenix/pull/2133#discussion_r2067068121
########## phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileCodec.java: ########## @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.log; + +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ByteBuffInputStream; +import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.WritableUtils; + +/** + * Default Codec for encoding and decoding ReplicationLog Records within a block buffer. + * This implementation uses standard Java DataInput/DataOutput for serialization. + * + * Record Format within a block: + * <pre> + * +--------------------------------------------+ + * | RECORD LENGTH (vint) | + * +--------------------------------------------+ + * | RECORD HEADER | + * | - Mutation type (byte) | + * | - HBase table name length (vint) | + * | - HBase table name (byte[]) | + * | - Transaction/SCN or commit ID (vint) | + * +--------------------------------------------+ + * | ROW KEY LENGTH (vint) | + * | ROW KEY (byte[]) | + * +--------------------------------------------+ + * | MUTATION TIMESTAMP (vint) | + * +--------------------------------------------+ + * | NUMBER OF COLUMN FAMILIES CHANGED (vint) | + * +--------------------------------------------+ + * | PER-FAMILY DATA (repeated) | + * | +--------------------------------------+ | + * | | COLUMN FAMILY NAME LENGTH (vint) | | + * | | COLUMN FAMILY NAME (byte[]) | | + * | | NUMBER OF CELLS IN FAMILY (vint) | | + * | +--------------------------------------+ | + * | | PER-CELL DATA (repeated) | | + * | | +––––––––––––----------------–--–+ | | + * | | | COLUMN QUALIFIER LENGTH (vint) | | | + * | | | COLUMN QUALIFIER (byte[]) | | | + * | | | VALUE LENGTH (vint) | | | + * | | | VALUE (byte[]) | | | + * | | +–––––––––––––--------------––--–+ | | + * | +--------------------------------------+ | + * +--------------------------------------------+ + * </pre> + */ +public class LogFileCodec implements LogFile.Codec { + + @Override + public Encoder getEncoder(DataOutput out) { + return new RecordEncoder(out); + } + + @Override + public Decoder getDecoder(DataInput in) { + return new RecordDecoder(in); + } + + @Override + public Decoder getDecoder(ByteBuffer buffer) { + // We use HBase ByteBuff and ByteBuffInputStream which avoids copying, because our buffers + // are known to be heap based. + ByteBuff wrapByteBuf = ByteBuff.wrap(buffer); + try { + return new RecordDecoder(new DataInputStream(new ByteBuffInputStream(wrapByteBuf))); + } finally { + wrapByteBuf.release(); + } + } + + private static class RecordEncoder implements LogFile.Codec.Encoder { + private final DataOutput out; + private final ByteArrayOutputStream currentRecord; + + RecordEncoder(DataOutput out) { + this.out = out; + currentRecord = new ByteArrayOutputStream(); + } + + + @Override + public void write(LogFile.Record record) throws IOException { + + DataOutput recordOut = new DataOutputStream(currentRecord); + + // Write record fields + + Mutation mutation = record.getMutation(); + LogFileRecord.MutationType mutationType = LogFileRecord.MutationType.get(mutation); + recordOut.writeByte(mutationType.getCode()); + byte[] nameBytes = record.getHBaseTableName().getBytes(StandardCharsets.UTF_8); + WritableUtils.writeVInt(recordOut, nameBytes.length); + recordOut.write(nameBytes); + WritableUtils.writeVLong(recordOut, record.getCommitId()); + byte[] rowKey = mutation.getRow(); + WritableUtils.writeVInt(recordOut, rowKey.length); + recordOut.write(rowKey); + recordOut.writeLong(mutation.getTimestamp()); + + Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap(); + int cfCount = familyMap.size(); + WritableUtils.writeVInt(recordOut, cfCount); + + for (Map.Entry<byte[], List<Cell>> entry: familyMap.entrySet()) { + byte[] columnFamily = entry.getKey(); + WritableUtils.writeVInt(recordOut, columnFamily.length); + recordOut.write(columnFamily); + List<Cell> cells = entry.getValue(); + WritableUtils.writeVInt(recordOut, cells.size()); + for (Cell cell: cells) { + WritableUtils.writeVInt(recordOut, cell.getQualifierLength()); + recordOut.write(cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength()); + WritableUtils.writeVInt(recordOut, cell.getValueLength()); + if (cell.getValueLength() > 0) { + recordOut.write(cell.getValueArray(), cell.getValueOffset(), + cell.getValueLength()); + } + } + } + + byte[] currentRecordBytes = currentRecord.toByteArray(); + // Write total record length + WritableUtils.writeVInt(out, currentRecordBytes.length); + // Write the record + out.write(currentRecordBytes); + + // Set the size (including the vint prefix) on the record object + ((LogFileRecord) record).setSerializedLength(currentRecordBytes.length + + WritableUtils.getVIntSize(currentRecordBytes.length)); + + // Reset the ByteArrayOutputStream to release resources + currentRecord.reset(); + } + } + + private static class RecordDecoder implements LogFile.Codec.Decoder { + private final DataInput in; + // A reference to the object populated by the last successful advance() + private LogFileRecord current = null; + + RecordDecoder(DataInput in) { + this.in = in; + } + + @Override + public boolean advance(LogFile.Record reuse) throws IOException { + try { + int recordDataLength = WritableUtils.readVInt(in); + recordDataLength += WritableUtils.getVIntSize(recordDataLength); + + // If we are reusing a record object, prepare it for new data + if (reuse == null || !(reuse instanceof LogFileRecord)) { + current = new LogFileRecord(); + } else { + current = (LogFileRecord) reuse; + } + // Set the total serialized length on the record + current.setSerializedLength(recordDataLength); + + LogFileRecord.MutationType type = + LogFileRecord.MutationType.codeToType(in.readByte()); + + int nameBytesLen = WritableUtils.readVInt(in); + byte[] nameBytes = new byte[nameBytesLen]; + in.readFully(nameBytes); + + current.setHBaseTableName(Bytes.toString(nameBytes)); + + current.setCommitId(WritableUtils.readVLong(in)); + + int rowKeyLen = WritableUtils.readVInt(in); + byte[] rowKey = new byte[rowKeyLen]; + in.readFully(rowKey); + + Mutation mutation; + switch (type) { + case PUT: + mutation = new Put(rowKey); + break; + case DELETE: + case DELETEFAMILYVERSION: + case DELETECOLUMN: + case DELETEFAMILY: + mutation = new Delete(rowKey); + break; + default: + throw new UnsupportedOperationException("Unhandled mutation type " + type); + } + current.setMutation(mutation); + + long ts = in.readLong(); + mutation.setTimestamp(ts); + + int colCount = WritableUtils.readVInt(in); + for (int i = 0; i < colCount; i++) { + // Col name + int columnLen = WritableUtils.readVInt(in); + byte[] column = new byte[columnLen]; Review Comment: cf instead of column ########## phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileCodec.java: ########## @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.log; + +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ByteBuffInputStream; +import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.WritableUtils; + +/** + * Default Codec for encoding and decoding ReplicationLog Records within a block buffer. + * This implementation uses standard Java DataInput/DataOutput for serialization. + * + * Record Format within a block: + * <pre> + * +--------------------------------------------+ + * | RECORD LENGTH (vint) | + * +--------------------------------------------+ + * | RECORD HEADER | + * | - Mutation type (byte) | + * | - HBase table name length (vint) | + * | - HBase table name (byte[]) | + * | - Transaction/SCN or commit ID (vint) | + * +--------------------------------------------+ + * | ROW KEY LENGTH (vint) | + * | ROW KEY (byte[]) | + * +--------------------------------------------+ + * | MUTATION TIMESTAMP (vint) | + * +--------------------------------------------+ + * | NUMBER OF COLUMN FAMILIES CHANGED (vint) | + * +--------------------------------------------+ + * | PER-FAMILY DATA (repeated) | + * | +--------------------------------------+ | + * | | COLUMN FAMILY NAME LENGTH (vint) | | + * | | COLUMN FAMILY NAME (byte[]) | | + * | | NUMBER OF CELLS IN FAMILY (vint) | | + * | +--------------------------------------+ | + * | | PER-CELL DATA (repeated) | | + * | | +––––––––––––----------------–--–+ | | + * | | | COLUMN QUALIFIER LENGTH (vint) | | | + * | | | COLUMN QUALIFIER (byte[]) | | | + * | | | VALUE LENGTH (vint) | | | + * | | | VALUE (byte[]) | | | + * | | +–––––––––––––--------------––--–+ | | + * | +--------------------------------------+ | + * +--------------------------------------------+ + * </pre> + */ +public class LogFileCodec implements LogFile.Codec { + + @Override + public Encoder getEncoder(DataOutput out) { + return new RecordEncoder(out); + } + + @Override + public Decoder getDecoder(DataInput in) { + return new RecordDecoder(in); + } + + @Override + public Decoder getDecoder(ByteBuffer buffer) { + // We use HBase ByteBuff and ByteBuffInputStream which avoids copying, because our buffers + // are known to be heap based. + ByteBuff wrapByteBuf = ByteBuff.wrap(buffer); + try { + return new RecordDecoder(new DataInputStream(new ByteBuffInputStream(wrapByteBuf))); + } finally { + wrapByteBuf.release(); + } + } + + private static class RecordEncoder implements LogFile.Codec.Encoder { + private final DataOutput out; + private final ByteArrayOutputStream currentRecord; + + RecordEncoder(DataOutput out) { + this.out = out; + currentRecord = new ByteArrayOutputStream(); + } + + + @Override + public void write(LogFile.Record record) throws IOException { + + DataOutput recordOut = new DataOutputStream(currentRecord); + + // Write record fields + + Mutation mutation = record.getMutation(); + LogFileRecord.MutationType mutationType = LogFileRecord.MutationType.get(mutation); + recordOut.writeByte(mutationType.getCode()); + byte[] nameBytes = record.getHBaseTableName().getBytes(StandardCharsets.UTF_8); + WritableUtils.writeVInt(recordOut, nameBytes.length); + recordOut.write(nameBytes); + WritableUtils.writeVLong(recordOut, record.getCommitId()); + byte[] rowKey = mutation.getRow(); + WritableUtils.writeVInt(recordOut, rowKey.length); + recordOut.write(rowKey); + recordOut.writeLong(mutation.getTimestamp()); + + Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap(); + int cfCount = familyMap.size(); + WritableUtils.writeVInt(recordOut, cfCount); + + for (Map.Entry<byte[], List<Cell>> entry: familyMap.entrySet()) { + byte[] columnFamily = entry.getKey(); + WritableUtils.writeVInt(recordOut, columnFamily.length); + recordOut.write(columnFamily); + List<Cell> cells = entry.getValue(); + WritableUtils.writeVInt(recordOut, cells.size()); + for (Cell cell: cells) { + WritableUtils.writeVInt(recordOut, cell.getQualifierLength()); + recordOut.write(cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength()); + WritableUtils.writeVInt(recordOut, cell.getValueLength()); + if (cell.getValueLength() > 0) { + recordOut.write(cell.getValueArray(), cell.getValueOffset(), + cell.getValueLength()); + } + } + } + + byte[] currentRecordBytes = currentRecord.toByteArray(); + // Write total record length + WritableUtils.writeVInt(out, currentRecordBytes.length); + // Write the record + out.write(currentRecordBytes); + + // Set the size (including the vint prefix) on the record object + ((LogFileRecord) record).setSerializedLength(currentRecordBytes.length + + WritableUtils.getVIntSize(currentRecordBytes.length)); + + // Reset the ByteArrayOutputStream to release resources + currentRecord.reset(); + } + } + + private static class RecordDecoder implements LogFile.Codec.Decoder { + private final DataInput in; + // A reference to the object populated by the last successful advance() + private LogFileRecord current = null; + + RecordDecoder(DataInput in) { + this.in = in; + } + + @Override + public boolean advance(LogFile.Record reuse) throws IOException { + try { + int recordDataLength = WritableUtils.readVInt(in); + recordDataLength += WritableUtils.getVIntSize(recordDataLength); + + // If we are reusing a record object, prepare it for new data + if (reuse == null || !(reuse instanceof LogFileRecord)) { + current = new LogFileRecord(); + } else { + current = (LogFileRecord) reuse; + } + // Set the total serialized length on the record + current.setSerializedLength(recordDataLength); + + LogFileRecord.MutationType type = + LogFileRecord.MutationType.codeToType(in.readByte()); + + int nameBytesLen = WritableUtils.readVInt(in); + byte[] nameBytes = new byte[nameBytesLen]; + in.readFully(nameBytes); + + current.setHBaseTableName(Bytes.toString(nameBytes)); + + current.setCommitId(WritableUtils.readVLong(in)); + + int rowKeyLen = WritableUtils.readVInt(in); + byte[] rowKey = new byte[rowKeyLen]; + in.readFully(rowKey); + + Mutation mutation; + switch (type) { + case PUT: + mutation = new Put(rowKey); + break; + case DELETE: + case DELETEFAMILYVERSION: + case DELETECOLUMN: + case DELETEFAMILY: + mutation = new Delete(rowKey); + break; + default: + throw new UnsupportedOperationException("Unhandled mutation type " + type); + } + current.setMutation(mutation); + + long ts = in.readLong(); + mutation.setTimestamp(ts); + + int colCount = WritableUtils.readVInt(in); + for (int i = 0; i < colCount; i++) { + // Col name + int columnLen = WritableUtils.readVInt(in); + byte[] column = new byte[columnLen]; + in.readFully(column); + // Qualifiers+Values Count + int valuesCount = WritableUtils.readVInt(in); Review Comment: columnValuePairsCount instead of valuesCount ########## phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileCodec.java: ########## @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.log; + +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ByteBuffInputStream; +import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.WritableUtils; + +/** + * Default Codec for encoding and decoding ReplicationLog Records within a block buffer. + * This implementation uses standard Java DataInput/DataOutput for serialization. + * + * Record Format within a block: + * <pre> + * +--------------------------------------------+ + * | RECORD LENGTH (vint) | + * +--------------------------------------------+ + * | RECORD HEADER | + * | - Mutation type (byte) | + * | - HBase table name length (vint) | + * | - HBase table name (byte[]) | + * | - Transaction/SCN or commit ID (vint) | + * +--------------------------------------------+ + * | ROW KEY LENGTH (vint) | + * | ROW KEY (byte[]) | + * +--------------------------------------------+ + * | MUTATION TIMESTAMP (vint) | + * +--------------------------------------------+ + * | NUMBER OF COLUMN FAMILIES CHANGED (vint) | + * +--------------------------------------------+ + * | PER-FAMILY DATA (repeated) | + * | +--------------------------------------+ | + * | | COLUMN FAMILY NAME LENGTH (vint) | | + * | | COLUMN FAMILY NAME (byte[]) | | + * | | NUMBER OF CELLS IN FAMILY (vint) | | + * | +--------------------------------------+ | + * | | PER-CELL DATA (repeated) | | + * | | +––––––––––––----------------–--–+ | | + * | | | COLUMN QUALIFIER LENGTH (vint) | | | + * | | | COLUMN QUALIFIER (byte[]) | | | + * | | | VALUE LENGTH (vint) | | | + * | | | VALUE (byte[]) | | | + * | | +–––––––––––––--------------––--–+ | | + * | +--------------------------------------+ | + * +--------------------------------------------+ + * </pre> + */ +public class LogFileCodec implements LogFile.Codec { + + @Override + public Encoder getEncoder(DataOutput out) { + return new RecordEncoder(out); + } + + @Override + public Decoder getDecoder(DataInput in) { + return new RecordDecoder(in); + } + + @Override + public Decoder getDecoder(ByteBuffer buffer) { + // We use HBase ByteBuff and ByteBuffInputStream which avoids copying, because our buffers + // are known to be heap based. + ByteBuff wrapByteBuf = ByteBuff.wrap(buffer); + try { + return new RecordDecoder(new DataInputStream(new ByteBuffInputStream(wrapByteBuf))); + } finally { + wrapByteBuf.release(); + } + } + + private static class RecordEncoder implements LogFile.Codec.Encoder { + private final DataOutput out; + private final ByteArrayOutputStream currentRecord; + + RecordEncoder(DataOutput out) { + this.out = out; + currentRecord = new ByteArrayOutputStream(); + } + + + @Override + public void write(LogFile.Record record) throws IOException { + + DataOutput recordOut = new DataOutputStream(currentRecord); + + // Write record fields + + Mutation mutation = record.getMutation(); + LogFileRecord.MutationType mutationType = LogFileRecord.MutationType.get(mutation); + recordOut.writeByte(mutationType.getCode()); + byte[] nameBytes = record.getHBaseTableName().getBytes(StandardCharsets.UTF_8); + WritableUtils.writeVInt(recordOut, nameBytes.length); + recordOut.write(nameBytes); + WritableUtils.writeVLong(recordOut, record.getCommitId()); + byte[] rowKey = mutation.getRow(); + WritableUtils.writeVInt(recordOut, rowKey.length); + recordOut.write(rowKey); + recordOut.writeLong(mutation.getTimestamp()); + + Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap(); + int cfCount = familyMap.size(); + WritableUtils.writeVInt(recordOut, cfCount); + + for (Map.Entry<byte[], List<Cell>> entry: familyMap.entrySet()) { + byte[] columnFamily = entry.getKey(); + WritableUtils.writeVInt(recordOut, columnFamily.length); + recordOut.write(columnFamily); + List<Cell> cells = entry.getValue(); + WritableUtils.writeVInt(recordOut, cells.size()); + for (Cell cell: cells) { + WritableUtils.writeVInt(recordOut, cell.getQualifierLength()); + recordOut.write(cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength()); + WritableUtils.writeVInt(recordOut, cell.getValueLength()); + if (cell.getValueLength() > 0) { + recordOut.write(cell.getValueArray(), cell.getValueOffset(), + cell.getValueLength()); + } + } + } + + byte[] currentRecordBytes = currentRecord.toByteArray(); + // Write total record length + WritableUtils.writeVInt(out, currentRecordBytes.length); + // Write the record + out.write(currentRecordBytes); + + // Set the size (including the vint prefix) on the record object + ((LogFileRecord) record).setSerializedLength(currentRecordBytes.length + + WritableUtils.getVIntSize(currentRecordBytes.length)); + + // Reset the ByteArrayOutputStream to release resources + currentRecord.reset(); + } + } + + private static class RecordDecoder implements LogFile.Codec.Decoder { + private final DataInput in; + // A reference to the object populated by the last successful advance() + private LogFileRecord current = null; + + RecordDecoder(DataInput in) { + this.in = in; + } + + @Override + public boolean advance(LogFile.Record reuse) throws IOException { + try { + int recordDataLength = WritableUtils.readVInt(in); + recordDataLength += WritableUtils.getVIntSize(recordDataLength); + + // If we are reusing a record object, prepare it for new data + if (reuse == null || !(reuse instanceof LogFileRecord)) { + current = new LogFileRecord(); + } else { + current = (LogFileRecord) reuse; + } + // Set the total serialized length on the record + current.setSerializedLength(recordDataLength); + + LogFileRecord.MutationType type = + LogFileRecord.MutationType.codeToType(in.readByte()); + + int nameBytesLen = WritableUtils.readVInt(in); + byte[] nameBytes = new byte[nameBytesLen]; + in.readFully(nameBytes); + + current.setHBaseTableName(Bytes.toString(nameBytes)); + + current.setCommitId(WritableUtils.readVLong(in)); + + int rowKeyLen = WritableUtils.readVInt(in); + byte[] rowKey = new byte[rowKeyLen]; + in.readFully(rowKey); + + Mutation mutation; + switch (type) { + case PUT: + mutation = new Put(rowKey); + break; + case DELETE: + case DELETEFAMILYVERSION: + case DELETECOLUMN: + case DELETEFAMILY: + mutation = new Delete(rowKey); + break; + default: + throw new UnsupportedOperationException("Unhandled mutation type " + type); + } + current.setMutation(mutation); + + long ts = in.readLong(); + mutation.setTimestamp(ts); + + int colCount = WritableUtils.readVInt(in); Review Comment: cfCount rather than colCount ########## phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java: ########## @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.log; + +import java.io.Closeable; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.io.compress.Compression; + +/** + * Defines the structure and constants for Phoenix Replication Log files. + * Provides interfaces for reading and writing these logs. + */ +public interface LogFile { + + /** Represents the file header */ + interface Header { + /** + * Gets the major version of the log file format. + * @return The major version number. + */ + int getMajorVersion(); + + /** + * Sets the major version of the log file format. + * @param majorVersion The major version number to set. + * @return This Header instance for chaining. + */ + Header setMajorVersion(int majorVersion); + + /** + * Gets the minor version of the log file format. + * @return The minor version number. + */ + int getMinorVersion(); + + /** + * Sets the minor version of the log file format. + * @param minorVersion The minor version number to set. + * @return This Header instance for chaining. + */ + Header setMinorVersion(int minorVersion); + + /** + * Gets the fixed serialized length of the header in bytes. + * @return The length of the serialized header. + */ + int getSerializedLength(); + + /** + * Reads the header fields from the provided DataInput stream. + * @param in The DataInput stream to read from. + * @throws IOException if an I/O error occurs or the header format is invalid. + */ + void readFields(DataInput in) throws IOException; + + /** + * Writes the header fields to the provided DataOutput stream. + * @param out The DataOutput stream to write to. + * @throws IOException if an I/O error occurs. + */ + void write(DataOutput out) throws IOException; + } + + /** Represents the header of a single block within the log file */ + interface BlockHeader { + /** + * Gets the version of this block header format. + * @return The block header version number. + */ + int getVersion(); + + /** + * Gets the compression algorithm used for the data payload of this block. + * @return The compression algorithm. + */ + Compression.Algorithm getDataCompression(); + + /** + * Sets the compression algorithm used for the data payload of this block. + * @param compression The compression algorithm to set. + * @return This BlockHeader instance for chaining. + */ + BlockHeader setDataCompression(Compression.Algorithm compression); + + /** + * Gets the size of the data payload before compression. + * @return The uncompressed data size in bytes. + */ + int getUncompressedDataSize(); + + /** + * Sets the size of the data payload before compression. + * @param uncompressedSize The uncompressed data size in bytes. + * @return This BlockHeader instance for chaining. + */ + BlockHeader setUncompressedDataSize(int uncompressedSize); + + /** + * Gets the size of the data payload after compression. If compression is NONE, + * this will be the same as the uncompressed size. + * @return The compressed data size in bytes. + */ + int getCompressedDataSize(); + + /** + * Sets the size of the data payload after compression. + * @param compressedSize The compressed data size in bytes. + * @return This BlockHeader instance for chaining. + */ + BlockHeader setCompressedDataSize(int compressedSize); + + /** + * Gets the fixed serialized length of the block header itself in bytes. + * @return The length of the serialized block header. + */ + int getSerializedHeaderLength(); + + /** + * Reads the block header fields from the provided DataInput stream. + * @param in The DataInput stream to read from. + * @throws IOException if an I/O error occurs or the block header format is invalid. + */ + void readFields(DataInput in) throws IOException; + + /** + * Writes the block header fields to the provided DataOutput stream. + * @param out The DataOutput stream to write to. + * @throws IOException if an I/O error occurs. + */ + void write(DataOutput out) throws IOException; + } + + /** Represents the file trailer */ + interface Trailer { + /** + * Gets the major version of the log file format stored in the trailer. + * Useful for validation when reading from the end of the file. + * @return The major version number. + */ + int getMajorVersion(); + + /** + * Sets the major version of the log file format in the trailer. + * @param majorVersion The major version number to set. + * @return This Trailer instance for chaining. + */ + Trailer setMajorVersion(int majorVersion); + + /** + * Gets the minor version of the log file format stored in the trailer. + * Useful for validation when reading from the end of the file. + * @return The minor version number. + */ + int getMinorVersion(); + + /** + * Sets the minor version of the log file format in the trailer. + * @param minorVersion The minor version number to set. + * @return This Trailer instance for chaining. + */ + Trailer setMinorVersion(int minorVersion); + + /** + * Gets the total number of records contained in the log file. + * @return The total record count. + */ + long getRecordCount(); + + /** + * Sets the total number of records contained in the log file. + * @param recordCount The total record count to set. + * @return This Trailer instance for chaining. + */ + Trailer setRecordCount(long recordCount); + + /** + * Gets the total number of blocks contained in the log file. + * @return The total block count. + */ + long getBlockCount(); + + /** + * Sets the total number of blocks contained in the log file. + * @param blockCount The total block count to set. + * @return This Trailer instance for chaining. + */ + Trailer setBlockCount(long blockCount); + + /** + * Gets the byte offset within the file where the first block begins (after the header). + * @return The starting offset of the first block. + */ + long getBlocksStartOffset(); + + /** + * Sets the byte offset within the file where the first block begins. + * @param offset The starting offset of the first block. + * @return This Trailer instance for chaining. + */ + Trailer setBlocksStartOffset(long offset); + + /** + * Gets the byte offset within the file where the trailer itself begins. + * @return The starting offset of the trailer. + */ + long getTrailerStartOffset(); Review Comment: Should this be called getTrailerMetadataStartOffset? ########## phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReaderContext.java: ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.log; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +/** + * Context for LogFileReader. Uses Builder pattern. + */ +@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = { "EI_EXPOSE_REP", "EI_EXPOSE_REP2" }, + justification = "Intentional") +public class LogFileReaderContext { + + /** Configuration key for skipping corrupt blocks */ + public static final String LOGFILE_SKIP_CORRUPT_BLOCKS = + "phoenix.replication.logfile.skip.corrupt.blocks"; + /** Default for skipping corrupt blocks */ + public static final boolean DEFAULT_LOGFILE_SKIP_CORRUPT_BLOCKS = true; + + private final Configuration conf; + private FileSystem fs; + private Path path; + private LogFileCodec codec; + private long fileSize = -1; + private boolean isSkipCorruptBlocks; + private AtomicLong blocksRead = new AtomicLong(); Review Comment: Do we need them to be Atomic? This may give the impression LogFileReader is thread safe. We expect a single reader thread reading a log file, don't we? ########## phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileCodec.java: ########## @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.log; + +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ByteBuffInputStream; +import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.WritableUtils; + +/** + * Default Codec for encoding and decoding ReplicationLog Records within a block buffer. + * This implementation uses standard Java DataInput/DataOutput for serialization. + * + * Record Format within a block: + * <pre> + * +--------------------------------------------+ + * | RECORD LENGTH (vint) | + * +--------------------------------------------+ + * | RECORD HEADER | + * | - Mutation type (byte) | + * | - HBase table name length (vint) | + * | - HBase table name (byte[]) | + * | - Transaction/SCN or commit ID (vint) | + * +--------------------------------------------+ + * | ROW KEY LENGTH (vint) | + * | ROW KEY (byte[]) | + * +--------------------------------------------+ + * | MUTATION TIMESTAMP (vint) | + * +--------------------------------------------+ + * | NUMBER OF COLUMN FAMILIES CHANGED (vint) | + * +--------------------------------------------+ + * | PER-FAMILY DATA (repeated) | + * | +--------------------------------------+ | + * | | COLUMN FAMILY NAME LENGTH (vint) | | + * | | COLUMN FAMILY NAME (byte[]) | | + * | | NUMBER OF CELLS IN FAMILY (vint) | | + * | +--------------------------------------+ | + * | | PER-CELL DATA (repeated) | | + * | | +––––––––––––----------------–--–+ | | + * | | | COLUMN QUALIFIER LENGTH (vint) | | | + * | | | COLUMN QUALIFIER (byte[]) | | | + * | | | VALUE LENGTH (vint) | | | + * | | | VALUE (byte[]) | | | + * | | +–––––––––––––--------------––--–+ | | + * | +--------------------------------------+ | + * +--------------------------------------------+ + * </pre> + */ +public class LogFileCodec implements LogFile.Codec { + + @Override + public Encoder getEncoder(DataOutput out) { + return new RecordEncoder(out); + } + + @Override + public Decoder getDecoder(DataInput in) { + return new RecordDecoder(in); + } + + @Override + public Decoder getDecoder(ByteBuffer buffer) { + // We use HBase ByteBuff and ByteBuffInputStream which avoids copying, because our buffers + // are known to be heap based. + ByteBuff wrapByteBuf = ByteBuff.wrap(buffer); + try { + return new RecordDecoder(new DataInputStream(new ByteBuffInputStream(wrapByteBuf))); + } finally { + wrapByteBuf.release(); + } + } + + private static class RecordEncoder implements LogFile.Codec.Encoder { + private final DataOutput out; + private final ByteArrayOutputStream currentRecord; + + RecordEncoder(DataOutput out) { + this.out = out; + currentRecord = new ByteArrayOutputStream(); + } + + + @Override + public void write(LogFile.Record record) throws IOException { + + DataOutput recordOut = new DataOutputStream(currentRecord); + + // Write record fields + + Mutation mutation = record.getMutation(); + LogFileRecord.MutationType mutationType = LogFileRecord.MutationType.get(mutation); + recordOut.writeByte(mutationType.getCode()); + byte[] nameBytes = record.getHBaseTableName().getBytes(StandardCharsets.UTF_8); + WritableUtils.writeVInt(recordOut, nameBytes.length); + recordOut.write(nameBytes); + WritableUtils.writeVLong(recordOut, record.getCommitId()); + byte[] rowKey = mutation.getRow(); + WritableUtils.writeVInt(recordOut, rowKey.length); + recordOut.write(rowKey); + recordOut.writeLong(mutation.getTimestamp()); + + Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap(); + int cfCount = familyMap.size(); + WritableUtils.writeVInt(recordOut, cfCount); + + for (Map.Entry<byte[], List<Cell>> entry: familyMap.entrySet()) { + byte[] columnFamily = entry.getKey(); + WritableUtils.writeVInt(recordOut, columnFamily.length); + recordOut.write(columnFamily); + List<Cell> cells = entry.getValue(); + WritableUtils.writeVInt(recordOut, cells.size()); + for (Cell cell: cells) { + WritableUtils.writeVInt(recordOut, cell.getQualifierLength()); + recordOut.write(cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength()); + WritableUtils.writeVInt(recordOut, cell.getValueLength()); + if (cell.getValueLength() > 0) { + recordOut.write(cell.getValueArray(), cell.getValueOffset(), + cell.getValueLength()); + } + } + } + + byte[] currentRecordBytes = currentRecord.toByteArray(); + // Write total record length + WritableUtils.writeVInt(out, currentRecordBytes.length); + // Write the record + out.write(currentRecordBytes); + + // Set the size (including the vint prefix) on the record object + ((LogFileRecord) record).setSerializedLength(currentRecordBytes.length + + WritableUtils.getVIntSize(currentRecordBytes.length)); + + // Reset the ByteArrayOutputStream to release resources + currentRecord.reset(); + } + } + + private static class RecordDecoder implements LogFile.Codec.Decoder { + private final DataInput in; + // A reference to the object populated by the last successful advance() + private LogFileRecord current = null; + + RecordDecoder(DataInput in) { + this.in = in; + } + + @Override + public boolean advance(LogFile.Record reuse) throws IOException { + try { + int recordDataLength = WritableUtils.readVInt(in); Review Comment: Instead of using an EOFException to determine the end of a block, should we keep track of the position within a block to improve CPU utilization? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org