Copilot commented on code in PR #2166: URL: https://github.com/apache/fluss/pull/2166#discussion_r2646514970
########## fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRowTest.java: ########## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.paimon.utils; + +import org.apache.fluss.row.InternalArray; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.GenericArray; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.Timestamp; +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link PaimonRowAsFlussRow}. */ +class PaimonRowAsFlussRowTest { + + @Test + void testArrayWithAllTypes() { + GenericRow paimonRow = new GenericRow(16); + // Primitive types + paimonRow.setField(0, new GenericArray(new boolean[] {true, false, true})); + paimonRow.setField(1, new GenericArray(new byte[] {1, 2, 3})); + paimonRow.setField(2, new GenericArray(new short[] {100, 200, 300})); + paimonRow.setField(3, new GenericArray(new int[] {1000, 2000, 3000})); + paimonRow.setField(4, new GenericArray(new long[] {10000L, 20000L, 30000L})); + paimonRow.setField(5, new GenericArray(new float[] {1.1f, 2.2f, 3.3f})); + paimonRow.setField(6, new GenericArray(new double[] {1.11, 2.22, 3.33})); + // String type + paimonRow.setField( + 7, + new GenericArray( + new BinaryString[] { + BinaryString.fromString("hello"), + BinaryString.fromString("world"), + BinaryString.fromString("test") + })); + // Decimal type + paimonRow.setField( + 8, + new GenericArray( + new Object[] { + Decimal.fromBigDecimal(new BigDecimal("123.45"), 10, 2), + Decimal.fromBigDecimal(new BigDecimal("678.90"), 10, 2), + Decimal.fromBigDecimal(new BigDecimal("999.99"), 10, 2) + })); + // Timestamp type + paimonRow.setField( + 9, + new GenericArray( + new Object[] { + Timestamp.fromEpochMillis(1698235273182L), + Timestamp.fromEpochMillis(1698235274000L), + Timestamp.fromEpochMillis(1698235275000L) + })); + // TimestampLTZ type + paimonRow.setField( + 10, + new GenericArray( + new Object[] { + Timestamp.fromEpochMillis(1698235273182L), + Timestamp.fromEpochMillis(1698235274000L), + Timestamp.fromEpochMillis(1698235275000L) + })); + // Binary type + paimonRow.setField( + 11, + new GenericArray( + new Object[] { + new byte[] {1, 2, 3}, + new byte[] {4, 5, 6, 7}, + new byte[] {8, 9, 10, 11, 12} + })); + // array<array<int>> type + paimonRow.setField( + 12, + new GenericArray( + new Object[] { + new GenericArray(new int[] {1, 2}), + new GenericArray(new int[] {3, 4, 5}) + })); + // System columns: __bucket, __offset, __timestamp + paimonRow.setField(13, 0); + paimonRow.setField(14, 0L); + paimonRow.setField(15, 0L); + + PaimonRowAsFlussRow flussRow = new PaimonRowAsFlussRow(paimonRow); + + // Test boolean array + InternalArray boolArray = flussRow.getArray(0); + assertThat(boolArray.size()).isEqualTo(3); + assertThat(boolArray.toBooleanArray()).isEqualTo(new boolean[] {true, false, true}); + + // Test byte array + InternalArray byteArray = flussRow.getArray(1); + assertThat(byteArray.size()).isEqualTo(3); + assertThat(byteArray.toByteArray()).isEqualTo(new byte[] {1, 2, 3}); + + // Test short array + InternalArray shortArray = flussRow.getArray(2); + assertThat(shortArray.size()).isEqualTo(3); + assertThat(shortArray.toShortArray()).isEqualTo(new short[] {100, 200, 300}); + + // Test int array + InternalArray intArray = flussRow.getArray(3); + assertThat(intArray.size()).isEqualTo(3); + assertThat(intArray.toIntArray()).isEqualTo(new int[] {1000, 2000, 3000}); + + // Test long array + InternalArray longArray = flussRow.getArray(4); + assertThat(longArray.size()).isEqualTo(3); + assertThat(longArray.toLongArray()).isEqualTo(new long[] {10000L, 20000L, 30000L}); + + // Test float array + InternalArray floatArray = flussRow.getArray(5); + assertThat(floatArray.size()).isEqualTo(3); + assertThat(floatArray.toFloatArray()).isEqualTo(new float[] {1.1f, 2.2f, 3.3f}); + + // Test double array + InternalArray doubleArray = flussRow.getArray(6); + assertThat(doubleArray.size()).isEqualTo(3); + assertThat(doubleArray.toDoubleArray()).isEqualTo(new double[] {1.11, 2.22, 3.33}); + + // Test string array + InternalArray stringArray = flussRow.getArray(7); + assertThat(stringArray.size()).isEqualTo(3); + assertThat(stringArray.getString(0).toString()).isEqualTo("hello"); + assertThat(stringArray.getString(1).toString()).isEqualTo("world"); + assertThat(stringArray.getString(2).toString()).isEqualTo("test"); + + // Test decimal array + InternalArray decimalArray = flussRow.getArray(8); + assertThat(decimalArray.size()).isEqualTo(3); + assertThat(decimalArray.getDecimal(0, 10, 2).toBigDecimal()) + .isEqualTo(new BigDecimal("123.45")); + assertThat(decimalArray.getDecimal(1, 10, 2).toBigDecimal()) + .isEqualTo(new BigDecimal("678.90")); + assertThat(decimalArray.getDecimal(2, 10, 2).toBigDecimal()) + .isEqualTo(new BigDecimal("999.99")); + + // Test timestamp array + InternalArray timestampArray = flussRow.getArray(9); + assertThat(timestampArray.size()).isEqualTo(3); + assertThat(timestampArray.getTimestampNtz(0, 3).getMillisecond()).isEqualTo(1698235273182L); + assertThat(timestampArray.getTimestampNtz(1, 3).getMillisecond()).isEqualTo(1698235274000L); + assertThat(timestampArray.getTimestampNtz(2, 3).getMillisecond()).isEqualTo(1698235275000L); + + // Test timestamp_ltz array + InternalArray timestampLtzArray = flussRow.getArray(10); + assertThat(timestampLtzArray.size()).isEqualTo(3); + assertThat(timestampLtzArray.getTimestampLtz(0, 3).getEpochMillisecond()) + .isEqualTo(1698235273182L); + assertThat(timestampLtzArray.getTimestampLtz(1, 3).getEpochMillisecond()) + .isEqualTo(1698235274000L); + assertThat(timestampLtzArray.getTimestampLtz(2, 3).getEpochMillisecond()) + .isEqualTo(1698235275000L); + + // Test binary array + InternalArray binaryArray = flussRow.getArray(11); + assertThat(binaryArray.size()).isEqualTo(3); + assertThat(binaryArray.getBinary(0, 3)).isEqualTo(new byte[] {1, 2, 3}); + assertThat(binaryArray.getBinary(1, 4)).isEqualTo(new byte[] {4, 5, 6, 7}); + assertThat(binaryArray.getBinary(2, 5)).isEqualTo(new byte[] {8, 9, 10, 11, 12}); + // Also test getBytes method + assertThat(binaryArray.getBytes(0)).isEqualTo(new byte[] {1, 2, 3}); + assertThat(binaryArray.getBytes(1)).isEqualTo(new byte[] {4, 5, 6, 7}); + assertThat(binaryArray.getBytes(2)).isEqualTo(new byte[] {8, 9, 10, 11, 12}); + + // Also test array<int> array Review Comment: The comment says "Also test array<int> array" but should say "Also test array<array<int>>" or "Also test nested array" to accurately describe that it's testing a two-dimensional array type. ```suggestion // Also test array<array<int>> (nested int array) ``` ########## fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRowTest.java: ########## @@ -140,4 +142,195 @@ void testPrimaryKeyTableRecord() { assertThat(new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType).getRowKind()) .isEqualTo(RowKind.INSERT); } + + @Test + void testArrayWithAllTypes() { + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.BooleanType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.TinyIntType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.SmallIntType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.IntType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.BigIntType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.FloatType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.DoubleType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.VarCharType(true, 30)), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.DecimalType(10, 2)), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.TimestampType(3)), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.LocalZonedTimestampType()), + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.VarBinaryType()), + // array<array<int>> + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.ArrayType( + new org.apache.paimon.types.IntType()))); + + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(13); + genericRow.setField(0, new GenericArray(new boolean[] {true, false, true})); + genericRow.setField(1, new GenericArray(new byte[] {1, 2, 3})); + genericRow.setField(2, new GenericArray(new short[] {100, 200, 300})); + genericRow.setField(3, new GenericArray(new Object[] {1000, 2000, 3000})); + genericRow.setField(4, new GenericArray(new long[] {10000L, 20000L, 30000L})); + genericRow.setField(5, new GenericArray(new float[] {1.1f, 2.2f, 3.3f})); + genericRow.setField(6, new GenericArray(new double[] {1.11, 2.22, 3.33})); + // String type + genericRow.setField( + 7, + new GenericArray( + new BinaryString[] { + BinaryString.fromString("hello"), + BinaryString.fromString("world"), + BinaryString.fromString("test") + })); + + // Decimal type + genericRow.setField( + 8, + new GenericArray( + new Object[] { + Decimal.fromBigDecimal(new BigDecimal("123.45"), 10, 2), + Decimal.fromBigDecimal(new BigDecimal("678.90"), 10, 2), + Decimal.fromBigDecimal(new BigDecimal("999.99"), 10, 2) + })); + + // Timestamp type + genericRow.setField( + 9, + new GenericArray( + new Object[] { + TimestampNtz.fromMillis(1698235273182L), + TimestampNtz.fromMillis(1698235274000L), + TimestampNtz.fromMillis(1698235275000L) + })); + + // TimestampLTZ type + genericRow.setField( + 10, + new GenericArray( + new Object[] { + TimestampLtz.fromEpochMillis(1698235273182L), + TimestampLtz.fromEpochMillis(1698235274000L), + TimestampLtz.fromEpochMillis(1698235275000L) + })); + + // Binary type + genericRow.setField( + 11, + new GenericArray( + new Object[] { + new byte[] {1, 2, 3}, + new byte[] {4, 5, 6, 7}, + new byte[] {8, 9, 10, 11, 12} + })); + + // array<array<int>> type + genericRow.setField( + 12, + new GenericArray( + new Object[] { + new GenericArray(new int[] {1, 2}), + new GenericArray(new int[] {3, 4, 5}) + })); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + FlussRowAsPaimonRow flussRowAsPaimonRow = + new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType); + + // Test boolean array + InternalArray boolArray = flussRowAsPaimonRow.getArray(0); + assertThat(boolArray.size()).isEqualTo(3); + assertThat(boolArray.toBooleanArray()).isEqualTo(new boolean[] {true, false, true}); + + // Test byte array + InternalArray byteArray = flussRowAsPaimonRow.getArray(1); + assertThat(byteArray.size()).isEqualTo(3); + assertThat(byteArray.toByteArray()).isEqualTo(new byte[] {1, 2, 3}); + + // Test short array + InternalArray shortArray = flussRowAsPaimonRow.getArray(2); + assertThat(shortArray.size()).isEqualTo(3); + assertThat(shortArray.toShortArray()).isEqualTo(new short[] {100, 200, 300}); + + // Test int array + InternalArray intArray = flussRowAsPaimonRow.getArray(3); + assertThat(intArray.size()).isEqualTo(3); + assertThat(intArray.toIntArray()).isEqualTo(new int[] {1000, 2000, 3000}); + + // Test long array + InternalArray longArray = flussRowAsPaimonRow.getArray(4); + assertThat(longArray.size()).isEqualTo(3); + assertThat(longArray.toLongArray()).isEqualTo(new long[] {10000L, 20000L, 30000L}); + + // Test float array + InternalArray floatArray = flussRowAsPaimonRow.getArray(5); + assertThat(floatArray.size()).isEqualTo(3); + assertThat(floatArray.toFloatArray()).isEqualTo(new float[] {1.1f, 2.2f, 3.3f}); + + // Test double array + InternalArray doubleArray = flussRowAsPaimonRow.getArray(6); + assertThat(doubleArray.size()).isEqualTo(3); + assertThat(doubleArray.toDoubleArray()).isEqualTo(new double[] {1.11, 2.22, 3.33}); + + // Test string array + InternalArray stringArray = flussRowAsPaimonRow.getArray(7); + assertThat(stringArray.size()).isEqualTo(3); + assertThat(stringArray.getString(0).toString()).isEqualTo("hello"); + assertThat(stringArray.getString(1).toString()).isEqualTo("world"); + assertThat(stringArray.getString(2).toString()).isEqualTo("test"); + + // Test decimal array + InternalArray decimalArray = flussRowAsPaimonRow.getArray(8); + assertThat(decimalArray.size()).isEqualTo(3); + assertThat(decimalArray.getDecimal(0, 10, 2).toBigDecimal()) + .isEqualTo(new BigDecimal("123.45")); + assertThat(decimalArray.getDecimal(1, 10, 2).toBigDecimal()) + .isEqualTo(new BigDecimal("678.90")); + assertThat(decimalArray.getDecimal(2, 10, 2).toBigDecimal()) + .isEqualTo(new BigDecimal("999.99")); + + // Test timestamp array + InternalArray timestampArray = flussRowAsPaimonRow.getArray(9); + assertThat(timestampArray.size()).isEqualTo(3); + assertThat(timestampArray.getTimestamp(0, 3).getMillisecond()).isEqualTo(1698235273182L); + assertThat(timestampArray.getTimestamp(1, 3).getMillisecond()).isEqualTo(1698235274000L); + assertThat(timestampArray.getTimestamp(2, 3).getMillisecond()).isEqualTo(1698235275000L); + + // test timestamp_ltz array + timestampArray = flussRowAsPaimonRow.getArray(10); + assertThat(timestampArray.size()).isEqualTo(3); + assertThat(timestampArray.getTimestamp(0, 3).getMillisecond()).isEqualTo(1698235273182L); + assertThat(timestampArray.getTimestamp(1, 3).getMillisecond()).isEqualTo(1698235274000L); + assertThat(timestampArray.getTimestamp(2, 3).getMillisecond()).isEqualTo(1698235275000L); + + // Test binary array + InternalArray binaryArray = flussRowAsPaimonRow.getArray(11); + assertThat(binaryArray.size()).isEqualTo(3); + assertThat(binaryArray.getBinary(0)).isEqualTo(new byte[] {1, 2, 3}); + assertThat(binaryArray.getBinary(1)).isEqualTo(new byte[] {4, 5, 6, 7}); + assertThat(binaryArray.getBinary(2)).isEqualTo(new byte[] {8, 9, 10, 11, 12}); + + // Also test array<int> array Review Comment: The comment says "Also test array<int> array" but should say "Also test array<array<int>>" or "Also test nested array" to accurately describe that it's testing a two-dimensional array type. ```suggestion // Also test array<array<int>> (nested int array) ``` ########## fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussArrayAsPaimonArray.java: ########## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.paimon.source; + +import org.apache.fluss.row.TimestampLtz; +import org.apache.fluss.row.TimestampNtz; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.data.InternalMap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.variant.Variant; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataType; + +/** Adapter class for converting Fluss InternalArray to Paimon InternalArray. */ +public class FlussArrayAsPaimonArray implements InternalArray { + + private final org.apache.fluss.row.InternalArray flussArray; + private final DataType elementType; + + public FlussArrayAsPaimonArray( + org.apache.fluss.row.InternalArray flussArray, DataType elementType) { + this.flussArray = flussArray; + this.elementType = elementType; + } + + @Override + public int size() { + return flussArray.size(); + } + + @Override + public boolean isNullAt(int pos) { + return flussArray.isNullAt(pos); + } + + @Override + public boolean getBoolean(int pos) { + return flussArray.getBoolean(pos); + } + + @Override + public byte getByte(int pos) { + return flussArray.getByte(pos); + } + + @Override + public short getShort(int pos) { + return flussArray.getShort(pos); + } + + @Override + public int getInt(int pos) { + return flussArray.getInt(pos); + } + + @Override + public long getLong(int pos) { + return flussArray.getLong(pos); + } + + @Override + public float getFloat(int pos) { + return flussArray.getFloat(pos); + } + + @Override + public double getDouble(int pos) { + return flussArray.getDouble(pos); + } + + @Override + public BinaryString getString(int pos) { + return BinaryString.fromBytes(flussArray.getString(pos).toBytes()); + } + + @Override + public Decimal getDecimal(int pos, int precision, int scale) { + org.apache.fluss.row.Decimal flussDecimal = flussArray.getDecimal(pos, precision, scale); + if (flussDecimal.isCompact()) { + return Decimal.fromUnscaledLong(flussDecimal.toUnscaledLong(), precision, scale); + } else { + return Decimal.fromBigDecimal(flussDecimal.toBigDecimal(), precision, scale); + } + } + + @Override + public Timestamp getTimestamp(int pos, int precision) { + // Default to TIMESTAMP_WITHOUT_TIME_ZONE behavior for arrays + switch (elementType.getTypeRoot()) { + case TIMESTAMP_WITHOUT_TIME_ZONE: + if (TimestampNtz.isCompact(precision)) { + return Timestamp.fromEpochMillis( + flussArray.getTimestampNtz(pos, precision).getMillisecond()); + } else { + TimestampNtz timestampNtz = flussArray.getTimestampNtz(pos, precision); + return Timestamp.fromEpochMillis( + timestampNtz.getMillisecond(), timestampNtz.getNanoOfMillisecond()); + } + + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + if (TimestampLtz.isCompact(precision)) { + return Timestamp.fromEpochMillis( + flussArray.getTimestampLtz(pos, precision).getEpochMillisecond()); + } else { + TimestampLtz timestampLtz = flussArray.getTimestampLtz(pos, precision); + return Timestamp.fromEpochMillis( + timestampLtz.getEpochMillisecond(), + timestampLtz.getNanoOfMillisecond()); + } + default: + throw new UnsupportedOperationException( + "Unsupported data type to get timestamp: " + elementType); Review Comment: The getTimestamp method assumes the elementType will always be either TIMESTAMP_WITHOUT_TIME_ZONE or TIMESTAMP_WITH_LOCAL_TIME_ZONE, but doesn't handle other potential timestamp-related types or provide a clear error message for unsupported cases. While the default clause throws an UnsupportedOperationException, the error message could be more specific about which types are supported for arrays. ```suggestion "Unsupported array element type for getTimestamp. " + "Only TIMESTAMP_WITHOUT_TIME_ZONE and " + "TIMESTAMP_WITH_LOCAL_TIME_ZONE are supported, but got: " + elementType.getTypeRoot() + " (" + elementType + ")."); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
