loserwang1024 commented on code in PR #2189:
URL: https://github.com/apache/fluss/pull/2189#discussion_r2625411759


##########
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRowTest.java:
##########
@@ -163,4 +163,64 @@ void testPrimaryKeyTableRecord() {
         flussRecordAsPaimonRow.setFlussRecord(logRecord);
         
assertThat(flussRecordAsPaimonRow.getRowKind()).isEqualTo(RowKind.DELETE);
     }
+
+    @Test
+    void testPaimonSchemaWiderThanFlussRecord() {
+        int tableBucket = 0;
+        RowType tableRowType =
+                RowType.of(
+                        new org.apache.paimon.types.BooleanType(),
+                        new org.apache.paimon.types.VarCharType(),
+                        // append three system columns: __bucket, 
__offset,__timestamp
+                        new org.apache.paimon.types.IntType(),
+                        new org.apache.paimon.types.BigIntType(),
+                        new 
org.apache.paimon.types.LocalZonedTimestampType(3));
+
+        FlussRecordAsPaimonRow flussRecordAsPaimonRow =
+                new FlussRecordAsPaimonRow(tableBucket, tableRowType);
+
+        long logOffset = 7L;
+        long timeStamp = System.currentTimeMillis();
+        GenericRow genericRow = new GenericRow(1);
+        genericRow.setField(0, true);
+        LogRecord logRecord = new GenericRecord(logOffset, timeStamp, 
APPEND_ONLY, genericRow);
+        flussRecordAsPaimonRow.setFlussRecord(logRecord);
+
+        assertThat(flussRecordAsPaimonRow.getFieldCount()).isEqualTo(5);
+
+        assertThat(flussRecordAsPaimonRow.getBoolean(0)).isTrue();
+        assertThat(flussRecordAsPaimonRow.isNullAt(1)).isTrue();
+        assertThat(flussRecordAsPaimonRow.getInt(2)).isEqualTo(tableBucket);
+        assertThat(flussRecordAsPaimonRow.getLong(3)).isEqualTo(logOffset);
+        assertThat(flussRecordAsPaimonRow.getLong(4)).isEqualTo(timeStamp);
+    }
+
+    @Test
+    void testFlussRecordWiderThanPaimonSchema() {

Review Comment:
   Please test Fluss Record narrow than Paimon Schema. Currently, each time we 
read a roundes of a table's splits, the reader will get table info from fluss 
at first, then get table info from paimon catalog later. If add column between 
this, the paimon schema is wider than fluss record, we need to put into null.



##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java:
##########
@@ -27,35 +27,38 @@
 
 import static org.apache.fluss.lake.paimon.PaimonLakeCatalog.SYSTEM_COLUMNS;
 import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toRowKind;
-import static org.apache.fluss.utils.Preconditions.checkState;
 
 /** To wrap Fluss {@link LogRecord} as paimon {@link InternalRow}. */
 public class FlussRecordAsPaimonRow extends FlussRowAsPaimonRow {
 
     private final int bucket;
     private LogRecord logRecord;
     private int originRowFieldCount;
+    private final int businessFieldCount;

Review Comment:
   maybe dataFieldCount?



##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java:
##########
@@ -27,35 +27,38 @@
 
 import static org.apache.fluss.lake.paimon.PaimonLakeCatalog.SYSTEM_COLUMNS;
 import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toRowKind;
-import static org.apache.fluss.utils.Preconditions.checkState;
 
 /** To wrap Fluss {@link LogRecord} as paimon {@link InternalRow}. */
 public class FlussRecordAsPaimonRow extends FlussRowAsPaimonRow {
 
     private final int bucket;
     private LogRecord logRecord;
     private int originRowFieldCount;
+    private final int businessFieldCount;
+    private final int bucketFieldIndex;
+    private final int offsetFieldIndex;
+    private final int timestampFieldIndex;
 
     public FlussRecordAsPaimonRow(int bucket, RowType tableTowType) {
         super(tableTowType);
         this.bucket = bucket;
+        this.businessFieldCount = tableRowType.getFieldCount() - 
SYSTEM_COLUMNS.size();
+        this.bucketFieldIndex = businessFieldCount;
+        this.offsetFieldIndex = businessFieldCount + 1;
+        this.timestampFieldIndex = businessFieldCount + 2;
     }
 
     public void setFlussRecord(LogRecord logRecord) {
         this.logRecord = logRecord;
         this.internalRow = logRecord.getRow();
-        this.originRowFieldCount = internalRow.getFieldCount();
-        checkState(
-                originRowFieldCount == tableRowType.getFieldCount() - 
SYSTEM_COLUMNS.size(),
-                "The paimon table fields count must equals to LogRecord's 
fields count.");
+        this.originRowFieldCount = Math.min(internalRow.getFieldCount(), 
businessFieldCount);

Review Comment:
   If businessFieldCount is smaller than internalRow.getFieldCount(), just 
throw exception. Because data will be loss. To be honest,   businessFieldCount 
must be not less than  internalRow.getFieldCount() because we only support add 
column at last now.
   
   The reason is as follows:
    Currently, each time we read a roundes of a table's splits, the reader will 
get table info from fluss at first, then get table info from paimon catalog 
later. If add column between this, the paimon schema is wider than fluss 
record, we need to put into null.
   
   Maybe you can refer what PaddingRow does.(Similiar like it, when write data 
to fluss, flink data maybe narrower than fluss schema.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to