loserwang1024 commented on code in PR #2189:
URL: https://github.com/apache/fluss/pull/2189#discussion_r2625411759
##########
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java:
##########
@@ -163,4 +163,64 @@ void testPrimaryKeyTableRecord() {
flussRecordAsPaimonRow.setFlussRecord(logRecord);
assertThat(flussRecordAsPaimonRow.getRowKind()).isEqualTo(RowKind.DELETE);
}
+
+ @Test
+ void testPaimonSchemaWiderThanFlussRecord() {
+ int tableBucket = 0;
+ RowType tableRowType =
+ RowType.of(
+ new org.apache.paimon.types.BooleanType(),
+ new org.apache.paimon.types.VarCharType(),
+ // append three system columns: __bucket,
__offset,__timestamp
+ new org.apache.paimon.types.IntType(),
+ new org.apache.paimon.types.BigIntType(),
+ new
org.apache.paimon.types.LocalZonedTimestampType(3));
+
+ FlussRecordAsPaimonRow flussRecordAsPaimonRow =
+ new FlussRecordAsPaimonRow(tableBucket, tableRowType);
+
+ long logOffset = 7L;
+ long timeStamp = System.currentTimeMillis();
+ GenericRow genericRow = new GenericRow(1);
+ genericRow.setField(0, true);
+ LogRecord logRecord = new GenericRecord(logOffset, timeStamp,
APPEND_ONLY, genericRow);
+ flussRecordAsPaimonRow.setFlussRecord(logRecord);
+
+ assertThat(flussRecordAsPaimonRow.getFieldCount()).isEqualTo(5);
+
+ assertThat(flussRecordAsPaimonRow.getBoolean(0)).isTrue();
+ assertThat(flussRecordAsPaimonRow.isNullAt(1)).isTrue();
+ assertThat(flussRecordAsPaimonRow.getInt(2)).isEqualTo(tableBucket);
+ assertThat(flussRecordAsPaimonRow.getLong(3)).isEqualTo(logOffset);
+ assertThat(flussRecordAsPaimonRow.getLong(4)).isEqualTo(timeStamp);
+ }
+
+ @Test
+ void testFlussRecordWiderThanPaimonSchema() {
Review Comment:
Please test Fluss Record narrow than Paimon Schema. Currently, each time we
read a roundes of a table's splits, the reader will get table info from fluss
at first, then get table info from paimon catalog later. If add column between
this, the paimon schema is wider than fluss record, we need to put into null.
##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java:
##########
@@ -27,35 +27,38 @@
import static org.apache.fluss.lake.paimon.PaimonLakeCatalog.SYSTEM_COLUMNS;
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toRowKind;
-import static org.apache.fluss.utils.Preconditions.checkState;
/** To wrap Fluss {@link LogRecord} as paimon {@link InternalRow}. */
public class FlussRecordAsPaimonRow extends FlussRowAsPaimonRow {
private final int bucket;
private LogRecord logRecord;
private int originRowFieldCount;
+ private final int businessFieldCount;
Review Comment:
maybe dataFieldCount?
##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java:
##########
@@ -27,35 +27,38 @@
import static org.apache.fluss.lake.paimon.PaimonLakeCatalog.SYSTEM_COLUMNS;
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toRowKind;
-import static org.apache.fluss.utils.Preconditions.checkState;
/** To wrap Fluss {@link LogRecord} as paimon {@link InternalRow}. */
public class FlussRecordAsPaimonRow extends FlussRowAsPaimonRow {
private final int bucket;
private LogRecord logRecord;
private int originRowFieldCount;
+ private final int businessFieldCount;
+ private final int bucketFieldIndex;
+ private final int offsetFieldIndex;
+ private final int timestampFieldIndex;
public FlussRecordAsPaimonRow(int bucket, RowType tableTowType) {
super(tableTowType);
this.bucket = bucket;
+ this.businessFieldCount = tableRowType.getFieldCount() -
SYSTEM_COLUMNS.size();
+ this.bucketFieldIndex = businessFieldCount;
+ this.offsetFieldIndex = businessFieldCount + 1;
+ this.timestampFieldIndex = businessFieldCount + 2;
}
public void setFlussRecord(LogRecord logRecord) {
this.logRecord = logRecord;
this.internalRow = logRecord.getRow();
- this.originRowFieldCount = internalRow.getFieldCount();
- checkState(
- originRowFieldCount == tableRowType.getFieldCount() -
SYSTEM_COLUMNS.size(),
- "The paimon table fields count must equals to LogRecord's
fields count.");
+ this.originRowFieldCount = Math.min(internalRow.getFieldCount(),
businessFieldCount);
Review Comment:
If businessFieldCount is smaller than internalRow.getFieldCount(), just
throw exception. Because data will be loss. To be honest, businessFieldCount
must be not less than internalRow.getFieldCount() because we only support add
column at last now.
The reason is as follows:
Currently, each time we read a roundes of a table's splits, the reader will
get table info from fluss at first, then get table info from paimon catalog
later. If add column between this, the paimon schema is wider than fluss
record, we need to put into null.
Maybe you can refer what PaddingRow does.(Similiar like it, when write data
to fluss, flink data maybe narrower than fluss schema.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]