loserwang1024 commented on code in PR #2189:
URL: https://github.com/apache/fluss/pull/2189#discussion_r2625417160
##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java:
##########
@@ -27,35 +27,38 @@
import static org.apache.fluss.lake.paimon.PaimonLakeCatalog.SYSTEM_COLUMNS;
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toRowKind;
-import static org.apache.fluss.utils.Preconditions.checkState;
/** To wrap Fluss {@link LogRecord} as paimon {@link InternalRow}. */
public class FlussRecordAsPaimonRow extends FlussRowAsPaimonRow {
private final int bucket;
private LogRecord logRecord;
private int originRowFieldCount;
+ private final int businessFieldCount;
+ private final int bucketFieldIndex;
+ private final int offsetFieldIndex;
+ private final int timestampFieldIndex;
public FlussRecordAsPaimonRow(int bucket, RowType tableTowType) {
super(tableTowType);
this.bucket = bucket;
+ this.businessFieldCount = tableRowType.getFieldCount() -
SYSTEM_COLUMNS.size();
+ this.bucketFieldIndex = businessFieldCount;
+ this.offsetFieldIndex = businessFieldCount + 1;
+ this.timestampFieldIndex = businessFieldCount + 2;
}
public void setFlussRecord(LogRecord logRecord) {
this.logRecord = logRecord;
this.internalRow = logRecord.getRow();
- this.originRowFieldCount = internalRow.getFieldCount();
- checkState(
- originRowFieldCount == tableRowType.getFieldCount() -
SYSTEM_COLUMNS.size(),
- "The paimon table fields count must equals to LogRecord's
fields count.");
+ this.originRowFieldCount = Math.min(internalRow.getFieldCount(),
businessFieldCount);
Review Comment:
If businessFieldCount is smaller than internalRow.getFieldCount(), just
throw exception. Because data will be truncated, it is dangerous. To be honest,
businessFieldCount must be not less than internalRow.getFieldCount() because
we only support add column at last now.
The reason is as follows:
Currently, each time we read a roundes of a table's splits, the reader will
get table info from fluss at first, then get table info from paimon catalog
later. If add column between this, the paimon schema is wider than fluss
record, we need to put into null.
Maybe you can refer what PaddingRow does.(Similiar like it, when write data
to fluss, flink data maybe narrower than fluss schema.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]