This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/fluss.git
commit ec5078e112173fe5ec586738df7d7dfb508b02a3 Author: Junbo Wang <[email protected]> AuthorDate: Thu Jan 29 20:18:27 2026 +0800 [kv] Implement lookup with insertIfNotExists on tablet server (#2485) --- .../java/org/apache/fluss/metadata/Schema.java | 11 + .../org/apache/fluss/record/KeyRecordBatch.java | 251 ++++++++++++++++ .../org/apache/fluss/record/KvRecordBatch.java | 4 + .../apache/fluss/record/KvRecordReadContext.java | 5 + .../fluss/row/decode/CompactedKeyDecoder.java | 84 ++++++ .../org/apache/fluss/row/decode/KeyDecoder.java | 79 +++++ .../row/decode/iceberg/IcebergKeyDecoder.java | 143 +++++++++ .../fluss/row/decode/paimon/PaimonKeyDecoder.java | 210 ++++++++++++++ .../row/encode/paimon/PaimonBinaryRowWriter.java | 1 + .../apache/fluss/record/KeyRecordBatchTest.java | 194 +++++++++++++ .../java/org/apache/fluss/record/TestData.java | 28 ++ .../fluss/row/decode/CompactedKeyDecoderTest.java | 319 +++++++++++++++++++++ .../row/decode/iceberg/IcebergKeyDecoderTest.java | 212 ++++++++++++++ .../row/decode/paimon/PaimonKeyDecoderTest.java | 227 +++++++++++++++ .../row/encode/paimon/PaimonKeyEncoderTest.java | 4 +- .../org/apache/fluss/testutils/DataTestUtils.java | 14 + fluss-rpc/src/main/proto/FlussApi.proto | 3 + .../java/org/apache/fluss/server/kv/KvTablet.java | 23 +- .../fluss/server/replica/ReplicaManager.java | 151 +++++++++- .../apache/fluss/server/tablet/TabletService.java | 33 ++- .../fluss/server/replica/ReplicaManagerTest.java | 300 +++++++++++++++++++ .../fluss/server/replica/ReplicaTestBase.java | 10 + .../fluss/server/tablet/TabletServiceITCase.java | 99 +++++++ .../server/testutils/RpcMessageTestUtils.java | 21 ++ fluss-test-coverage/pom.xml | 1 + 25 files changed, 2411 insertions(+), 16 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java index 92b675c42..a97f09ee9 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java @@ -151,6 +151,17 @@ public final class Schema implements Serializable { .orElseGet(() -> new int[0]); } + /** + * Returns column indexes excluding auto-increment columns, used for partial update operations. + */ + public @Nullable int[] getNonAutoIncrementColumnIndexes() { + if (autoIncrementColumnNames.isEmpty()) { + return null; + } + int autoIncIdx = getColumnIndexes(autoIncrementColumnNames)[0]; + return IntStream.range(0, columns.size()).filter(i -> i != autoIncIdx).toArray(); + } + /** Returns the auto-increment columnIds, if any, otherwise returns an empty array. */ public int[] getAutoIncrementColumnIds() { if (autoIncrementColumnNames.isEmpty()) { diff --git a/fluss-common/src/main/java/org/apache/fluss/record/KeyRecordBatch.java b/fluss-common/src/main/java/org/apache/fluss/record/KeyRecordBatch.java new file mode 100644 index 000000000..6cf684ee1 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/record/KeyRecordBatch.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.record; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.config.TableConfig; +import org.apache.fluss.metadata.DataLakeFormat; +import org.apache.fluss.metadata.KvFormat; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.SchemaGetter; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.row.BinaryRow; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.decode.KeyDecoder; +import org.apache.fluss.row.encode.RowEncoder; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.RowType; + +import javax.annotation.Nullable; + +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID; + +/** + * A lazy-decoding KvRecordBatch that stores key bytes and converts them to KvRecords on-demand + * during iteration. + * + * <p>When iterating via {@link #records(ReadContext)}, each key is decoded and a full row is + * constructed with key fields populated and non-key fields set to null. This avoids upfront + * full-batch decoding, improving memory efficiency. + */ +public class KeyRecordBatch implements KvRecordBatch { + + private final List<byte[]> keyBytes; + private final short schemaId; + private final KvFormat kvFormat; + private final short kvFormatVersion; + private final boolean defaultBucketKey; + private final @Nullable DataLakeFormat lakeFormat; + + public static KeyRecordBatch create(List<byte[]> keyBytes, TableInfo tableInfo) { + TableConfig tableConfig = tableInfo.getTableConfig(); + KvFormat kvFormat = tableConfig.getKvFormat(); + short kvFormatVersion = tableConfig.getKvFormatVersion().orElse(1).shortValue(); + short schemaId = (short) tableInfo.getSchemaId(); + boolean defaultBucketKey = tableInfo.isDefaultBucketKey(); + return new KeyRecordBatch( + keyBytes, + kvFormat, + kvFormatVersion, + schemaId, + defaultBucketKey, + tableConfig.getDataLakeFormat().orElse(null)); + } + + @VisibleForTesting + KeyRecordBatch(List<byte[]> keyBytes, KvFormat kvFormat, short schemaId) { + this(keyBytes, kvFormat, (short) 1, schemaId, true, null); + } + + /** + * Creates a KeyRecordBatch for lazy KvRecord construction. + * + * @param keyBytes the list of encoded key bytes + */ + public KeyRecordBatch( + List<byte[]> keyBytes, + KvFormat kvFormat, + short kvFormatVersion, + short schemaId, + boolean defaultBucketKey, + @Nullable DataLakeFormat lakeFormat) { + this.keyBytes = keyBytes; + this.schemaId = schemaId; + this.kvFormat = kvFormat; + this.kvFormatVersion = kvFormatVersion; + this.defaultBucketKey = defaultBucketKey; + this.lakeFormat = lakeFormat; + } + + @Override + public Iterable<KvRecord> records(ReadContext readContext) { + return () -> new LazyKvRecordIterator(readContext); + } + + @Override + public int getRecordCount() { + return keyBytes.size(); + } + + @Override + public short schemaId() { + return schemaId; + } + + @Override + public boolean isValid() { + return true; + } + + @Override + public void ensureValid() { + // In-memory batch is always valid + } + + @Override + public long checksum() { + return 0; + } + + @Override + public byte magic() { + return CURRENT_KV_MAGIC_VALUE; + } + + @Override + public long writerId() { + return NO_WRITER_ID; + } + + @Override + public int batchSequence() { + return NO_BATCH_SEQUENCE; + } + + @Override + public int sizeInBytes() { + return keyBytes.stream().mapToInt(k -> k.length).sum(); + } + + /** Lazy iterator that constructs KvRecord on-demand during iteration. */ + private class LazyKvRecordIterator implements Iterator<KvRecord> { + private int index = 0; + + private final int fieldCount; + private final int[] keyIdxByFieldIdx; // Maps field index to key index (-1 if not a key) + private final InternalRow.FieldGetter[] keyFieldGetters; + private final RowEncoder rowEncoder; + private final KeyDecoder keyDecoder; + + LazyKvRecordIterator(ReadContext context) { + SchemaGetter schemaGetter = context.getSchemaGetter(); + Schema schema = schemaGetter.getSchema(schemaId); + RowType rowType = schema.getRowType(); + List<String> keyColumnNames = schema.getPrimaryKeyColumnNames(); + + this.fieldCount = rowType.getFieldCount(); + int[] keyFieldIndexes = schema.getPrimaryKeyIndexes(); + this.keyDecoder = + KeyDecoder.ofPrimaryKeyDecoder( + rowType, keyColumnNames, kvFormatVersion, lakeFormat, defaultBucketKey); + this.keyFieldGetters = new InternalRow.FieldGetter[keyColumnNames.size()]; + + // Build reverse mapping: field index -> key index + this.keyIdxByFieldIdx = new int[fieldCount]; + for (int i = 0; i < fieldCount; i++) { + keyIdxByFieldIdx[i] = -1; + } + for (int keyIdx = 0; keyIdx < keyFieldIndexes.length; keyIdx++) { + keyIdxByFieldIdx[keyFieldIndexes[keyIdx]] = keyIdx; + } + + // Create field getters for key fields (reading from decoded key row at position i) + for (int i = 0; i < keyColumnNames.size(); i++) { + DataType keyFieldType = rowType.getTypeAt(keyFieldIndexes[i]); + keyFieldGetters[i] = InternalRow.createFieldGetter(keyFieldType, i); + } + + this.rowEncoder = RowEncoder.create(kvFormat, rowType); + } + + @Override + public boolean hasNext() { + return index < keyBytes.size(); + } + + @Override + public KvRecord next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + byte[] key = keyBytes.get(index++); + + InternalRow keyRow = keyDecoder.decodeKey(key); + + // Build full row: key fields from decoded values, non-key fields as null + rowEncoder.startNewRow(); + for (int i = 0; i < fieldCount; i++) { + int keyIdx = keyIdxByFieldIdx[i]; + if (keyIdx != -1) { + // This is a key field - copy from decoded key row + Object value = keyFieldGetters[keyIdx].getFieldOrNull(keyRow); + rowEncoder.encodeField(i, value); + } else { + // Non-key field - set to null + rowEncoder.encodeField(i, null); + } + } + BinaryRow binaryRow = rowEncoder.finishRow(); + + return new KeyOnlyKvRecord(key, binaryRow); + } + } + + /** Simple KvRecord implementation for key-only records. */ + private static class KeyOnlyKvRecord implements KvRecord { + private final byte[] key; + private final BinaryRow row; + + KeyOnlyKvRecord(byte[] key, BinaryRow row) { + this.key = key; + this.row = row; + } + + @Override + public ByteBuffer getKey() { + return ByteBuffer.wrap(key); + } + + @Override + public BinaryRow getRow() { + return row; + } + + @Override + public int getSizeInBytes() { + return key.length + (row != null ? row.getSizeInBytes() : 0); + } + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/record/KvRecordBatch.java b/fluss-common/src/main/java/org/apache/fluss/record/KvRecordBatch.java index 66c999c28..0689ecadd 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/KvRecordBatch.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/KvRecordBatch.java @@ -18,6 +18,7 @@ package org.apache.fluss.record; import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.metadata.SchemaGetter; import org.apache.fluss.row.decode.RowDecoder; /** @@ -107,6 +108,9 @@ public interface KvRecordBatch { /** The read context of a {@link KvRecordBatch} to read records. */ interface ReadContext { + /** Gets the schema getter to retrieve schema for decoding record bytes. */ + SchemaGetter getSchemaGetter(); + /** * Gets the row decoder for the given schema to decode bytes read from {@link * KvRecordBatch}. diff --git a/fluss-common/src/main/java/org/apache/fluss/record/KvRecordReadContext.java b/fluss-common/src/main/java/org/apache/fluss/record/KvRecordReadContext.java index 5fffc3f4f..7e1878fc4 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/KvRecordReadContext.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/KvRecordReadContext.java @@ -44,6 +44,11 @@ public class KvRecordReadContext implements KvRecordBatch.ReadContext { return new KvRecordReadContext(kvFormat, schemaGetter); } + @Override + public SchemaGetter getSchemaGetter() { + return schemaGetter; + } + @Override public RowDecoder getRowDecoder(int schemaId) { return rowDecoderCache.computeIfAbsent( diff --git a/fluss-common/src/main/java/org/apache/fluss/row/decode/CompactedKeyDecoder.java b/fluss-common/src/main/java/org/apache/fluss/row/decode/CompactedKeyDecoder.java new file mode 100644 index 000000000..bebc210bc --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/row/decode/CompactedKeyDecoder.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.row.decode; + +import org.apache.fluss.memory.MemorySegment; +import org.apache.fluss.row.GenericRow; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.compacted.CompactedRowDeserializer; +import org.apache.fluss.row.compacted.CompactedRowReader; +import org.apache.fluss.row.encode.CompactedKeyEncoder; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.RowType; + +import java.util.List; + +/** + * A decoder to decode key bytes (encoded by {@link CompactedKeyEncoder}) back to {@link + * InternalRow}. + */ +public class CompactedKeyDecoder implements KeyDecoder { + + private final DataType[] keyDataTypes; + + /** + * Create a key decoder to decode the key bytes back to a row containing only key fields. + * + * @param rowType the row type used to locate key field positions + * @param keys the key field names to decode + */ + public static CompactedKeyDecoder createKeyDecoder(RowType rowType, List<String> keys) { + int[] keyFieldPos = new int[keys.size()]; + for (int i = 0; i < keys.size(); i++) { + keyFieldPos[i] = rowType.getFieldIndex(keys.get(i)); + if (keyFieldPos[i] == -1) { + throw new IllegalArgumentException( + "Field " + keys.get(i) + " not found in row type " + rowType); + } + } + return new CompactedKeyDecoder(rowType, keyFieldPos); + } + + public CompactedKeyDecoder(RowType rowType, int[] keyFieldPos) { + // for decoding key fields + keyDataTypes = new DataType[keyFieldPos.length]; + for (int i = 0; i < keyFieldPos.length; i++) { + keyDataTypes[i] = rowType.getTypeAt(keyFieldPos[i]).copy(false); + } + } + + /** + * Decode the key bytes back to a row containing only key fields. Non-key fields are not + * included in the returned row. + * + * @param keyBytes the key bytes encoded by {@link CompactedKeyEncoder} + * @return a row containing only the decoded key fields + */ + public InternalRow decodeKey(byte[] keyBytes) { + // Decode key fields directly into a GenericRow matching the decoded key size + CompactedRowReader compactedRowReader = new CompactedRowReader(0); + compactedRowReader.pointTo(MemorySegment.wrap(keyBytes), 0, keyBytes.length); + + CompactedRowDeserializer compactedRowDeserializer = + new CompactedRowDeserializer(keyDataTypes); + GenericRow genericRow = new GenericRow(keyDataTypes.length); + compactedRowDeserializer.deserialize(compactedRowReader, genericRow); + return genericRow; + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/row/decode/KeyDecoder.java b/fluss-common/src/main/java/org/apache/fluss/row/decode/KeyDecoder.java new file mode 100644 index 000000000..770526879 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/row/decode/KeyDecoder.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.row.decode; + +import org.apache.fluss.metadata.DataLakeFormat; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.decode.iceberg.IcebergKeyDecoder; +import org.apache.fluss.row.decode.paimon.PaimonKeyDecoder; +import org.apache.fluss.types.RowType; + +import javax.annotation.Nullable; + +import java.util.List; + +/** + * Interface for decoding key bytes back to {@link InternalRow}. + * + * <p>This interface provides functionality to decode binary key bytes into internal row + * representation, typically used for primary key decoding in KV tables. + */ +public interface KeyDecoder { + /** Decode key bytes to a row containing only key fields, without non-key fields. */ + InternalRow decodeKey(byte[] keyBytes); + + /** + * Creates a primary key decoder based on KV format version and data lake format. + * + * <p>Behavior aligns with {@link org.apache.fluss.row.encode.KeyEncoder#ofPrimaryKeyEncoder}. + * + * @param rowType the row type containing all fields + * @param keyFields the list of primary key field names + * @param kvFormatVersion the KV format version (1 or 2) + * @param lakeFormat the data lake format, null if not using lake storage + * @param isDefaultBucketKey whether using default bucket key (primary key as bucket key) + * @return the corresponding key decoder + */ + static KeyDecoder ofPrimaryKeyDecoder( + RowType rowType, + List<String> keyFields, + short kvFormatVersion, + @Nullable DataLakeFormat lakeFormat, + boolean isDefaultBucketKey) { + if (kvFormatVersion == 1 || (kvFormatVersion == 2 && isDefaultBucketKey)) { + if (lakeFormat == null || lakeFormat == DataLakeFormat.LANCE) { + return CompactedKeyDecoder.createKeyDecoder(rowType, keyFields); + } + if (lakeFormat == DataLakeFormat.PAIMON) { + return new PaimonKeyDecoder(rowType, keyFields); + } + if (lakeFormat == DataLakeFormat.ICEBERG) { + return new IcebergKeyDecoder(rowType, keyFields); + } + throw new UnsupportedOperationException( + "Unsupported datalake format for key decoding: " + lakeFormat); + } + if (kvFormatVersion == 2) { + // use CompactedKeyEncoder to support prefix look up + return CompactedKeyDecoder.createKeyDecoder(rowType, keyFields); + } + throw new UnsupportedOperationException( + "Unsupported kv format version: " + kvFormatVersion); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/row/decode/iceberg/IcebergKeyDecoder.java b/fluss-common/src/main/java/org/apache/fluss/row/decode/iceberg/IcebergKeyDecoder.java new file mode 100644 index 000000000..dd746e986 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/row/decode/iceberg/IcebergKeyDecoder.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.row.decode.iceberg; + +import org.apache.fluss.memory.MemorySegment; +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.GenericRow; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.decode.KeyDecoder; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.RowType; + +import java.util.List; + +import static org.apache.fluss.types.DataTypeChecks.getPrecision; +import static org.apache.fluss.types.DataTypeChecks.getScale; +import static org.apache.fluss.utils.Preconditions.checkArgument; + +/** + * Decoder for Iceberg-encoded key bytes. + * + * <p>This decoder reverses the encoding performed by {@link + * org.apache.fluss.row.encode.iceberg.IcebergKeyEncoder}. + * + * <p>The binary format follows Iceberg's encoding strategy: + * + * <ul> + * <li>Only single key field is supported + * <li>Integer types are encoded as long (8 bytes) in LITTLE_ENDIAN + * <li>Time is encoded as microseconds (long, 8 bytes) + * <li>Timestamp is encoded as microseconds (long, 8 bytes) + * <li>Strings/bytes/decimals have NO length prefix (direct binary data) + * <li>Decimals use BIG_ENDIAN byte order + * </ul> + */ +public class IcebergKeyDecoder implements KeyDecoder { + + private final FieldReader fieldReader; + + public IcebergKeyDecoder(RowType rowType, List<String> keys) { + checkArgument( + keys.size() == 1, + "Key fields must have exactly one field for iceberg format, but got: %s", + keys); + + int keyIndex = rowType.getFieldIndex(keys.get(0)); + DataType keyDataType = rowType.getTypeAt(keyIndex); + this.fieldReader = createFieldReader(keyDataType); + } + + @Override + public InternalRow decodeKey(byte[] keyBytes) { + MemorySegment segment = MemorySegment.wrap(keyBytes); + GenericRow row = new GenericRow(1); + Object value = fieldReader.readField(segment, 0); + row.setField(0, value); + return row; + } + + private FieldReader createFieldReader(DataType fieldType) { + switch (fieldType.getTypeRoot()) { + case INTEGER: + case DATE: + // Integer is encoded as long (8 bytes) + return (segment, offset) -> (int) segment.getLong(offset); + + case TIME_WITHOUT_TIME_ZONE: + // Time encoded as microseconds long + return (segment, offset) -> { + long micros = segment.getLong(offset); + return (int) (micros / 1000L); + }; + + case BIGINT: + return MemorySegment::getLong; + + case TIMESTAMP_WITHOUT_TIME_ZONE: + return (segment, offset) -> { + long micros = segment.getLong(offset); + long millis = micros / 1000L; + int nanoOfMillis = (int) ((micros % 1000L) * 1000L); + return TimestampNtz.fromMillis(millis, nanoOfMillis); + }; + + case DECIMAL: + final int decimalPrecision = getPrecision(fieldType); + final int decimalScale = getScale(fieldType); + return (segment, offset) -> { + // Decimal bytes are written directly without length prefix + int length = segment.size() - offset; + byte[] bytes = new byte[length]; + segment.get(offset, bytes, 0, length); + return Decimal.fromUnscaledBytes(bytes, decimalPrecision, decimalScale); + }; + + case STRING: + case CHAR: + return (segment, offset) -> { + // String bytes are written directly without length prefix + int length = segment.size() - offset; + byte[] bytes = new byte[length]; + segment.get(offset, bytes, 0, length); + return BinaryString.fromBytes(bytes); + }; + + case BINARY: + case BYTES: + return (segment, offset) -> { + // Binary bytes are written directly without length prefix + int length = segment.size() - offset; + byte[] bytes = new byte[length]; + segment.get(offset, bytes, 0, length); + return bytes; + }; + + default: + throw new IllegalArgumentException( + "Unsupported type for Iceberg key decoder: " + fieldType); + } + } + + /** Accessor for reading fields from Iceberg binary format. */ + interface FieldReader { + Object readField(MemorySegment segment, int offset); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/row/decode/paimon/PaimonKeyDecoder.java b/fluss-common/src/main/java/org/apache/fluss/row/decode/paimon/PaimonKeyDecoder.java new file mode 100644 index 000000000..f9a319022 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/row/decode/paimon/PaimonKeyDecoder.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.row.decode.paimon; + +import org.apache.fluss.memory.MemorySegment; +import org.apache.fluss.row.BinarySegmentUtils; +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.GenericRow; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.TimestampLtz; +import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.decode.KeyDecoder; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.RowType; + +import java.util.List; + +import static org.apache.fluss.types.DataTypeChecks.getPrecision; +import static org.apache.fluss.types.DataTypeChecks.getScale; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** + * An implementation of {@link KeyDecoder} to decode keys following Paimon's encoding strategy. + * + * <p>This decoder reverses the encoding performed by {@link + * org.apache.fluss.row.encode.paimon.PaimonKeyEncoder}. + * + * <p>The binary format follows Paimon's BinaryRow layout: + * + * <pre> + * Fixed-length part: + * - 1 byte header (RowKind) + * - Null bit set (aligned to 8-byte boundaries) + * - Field values (8 bytes each): + * - For fixed-length types: actual values + * - For variable-length types: offset (4 bytes) + length (4 bytes) + * + * Variable-length part: + * - Actual data for variable-length fields (strings, bytes, etc.) + * </pre> + */ +public class PaimonKeyDecoder implements KeyDecoder { + + private static final int HEADER_SIZE_IN_BITS = 8; + + private final DataType[] keyDataTypes; + private final FieldReader[] fieldReaders; + private final int nullBitsSizeInBytes; + + public PaimonKeyDecoder(RowType rowType, List<String> keys) { + this.keyDataTypes = new DataType[keys.size()]; + this.fieldReaders = new FieldReader[keys.size()]; + this.nullBitsSizeInBytes = calculateBitSetWidthInBytes(keys.size()); + for (int i = 0; i < keys.size(); i++) { + int keyIndex = rowType.getFieldIndex(keys.get(i)); + this.keyDataTypes[i] = rowType.getTypeAt(keyIndex); + this.fieldReaders[i] = createFieldReader(keyDataTypes[i], i); + } + } + + @Override + public InternalRow decodeKey(byte[] keyBytes) { + MemorySegment segment = MemorySegment.wrap(keyBytes); + GenericRow row = new GenericRow(keyDataTypes.length); + + for (int i = 0; i < keyDataTypes.length; i++) { + if (isNullAt(segment, i)) { + row.setField(i, null); + } else { + Object value = fieldReaders[i].readField(segment, 0); + row.setField(i, value); + } + } + + return row; + } + + private boolean isNullAt(MemorySegment segment, int pos) { + return BinarySegmentUtils.bitGet(segment, 0, pos + HEADER_SIZE_IN_BITS); + } + + private int getFieldOffset(int pos) { + return nullBitsSizeInBytes + pos * 8; + } + + private static int calculateBitSetWidthInBytes(int arity) { + return ((arity + 63 + HEADER_SIZE_IN_BITS) / 64) * 8; + } + + private FieldReader createFieldReader(DataType fieldType, int pos) { + final int fieldOffset = getFieldOffset(pos); + + switch (fieldType.getTypeRoot()) { + case BOOLEAN: + return (segment, baseOffset) -> segment.getBoolean(baseOffset + fieldOffset); + case TINYINT: + return (segment, baseOffset) -> segment.get(baseOffset + fieldOffset); + case SMALLINT: + return (segment, baseOffset) -> segment.getShort(baseOffset + fieldOffset); + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + return (segment, baseOffset) -> segment.getInt(baseOffset + fieldOffset); + case BIGINT: + return (segment, baseOffset) -> segment.getLong(baseOffset + fieldOffset); + case FLOAT: + return (segment, baseOffset) -> segment.getFloat(baseOffset + fieldOffset); + case DOUBLE: + return (segment, baseOffset) -> segment.getDouble(baseOffset + fieldOffset); + case CHAR: + case STRING: + return (segment, baseOffset) -> readString(segment, baseOffset, fieldOffset); + case BINARY: + case BYTES: + return (segment, baseOffset) -> readBinary(segment, baseOffset, fieldOffset); + case DECIMAL: + final int decimalPrecision = getPrecision(fieldType); + final int decimalScale = getScale(fieldType); + return (segment, baseOffset) -> + readDecimal( + segment, baseOffset, fieldOffset, decimalPrecision, decimalScale); + case TIMESTAMP_WITHOUT_TIME_ZONE: + final int timestampNtzPrecision = getPrecision(fieldType); + return (segment, baseOffset) -> + readTimestampNtz(segment, baseOffset, fieldOffset, timestampNtzPrecision); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + final int timestampLtzPrecision = getPrecision(fieldType); + return (segment, baseOffset) -> + readTimestampLtz(segment, baseOffset, fieldOffset, timestampLtzPrecision); + default: + throw new IllegalArgumentException( + "Unsupported type for Paimon key decoder: " + fieldType); + } + } + + private BinaryString readString(MemorySegment segment, int baseOffset, int fieldOffset) { + final long offsetAndLen = segment.getLong(baseOffset + fieldOffset); + return BinarySegmentUtils.readBinaryString( + new MemorySegment[] {segment}, 0, fieldOffset, offsetAndLen); + } + + private byte[] readBinary(MemorySegment segment, int baseOffset, int fieldOffset) { + final long offsetAndLen = segment.getLong(baseOffset + fieldOffset); + return BinarySegmentUtils.readBinary( + new MemorySegment[] {segment}, 0, fieldOffset, offsetAndLen); + } + + private Decimal readDecimal( + MemorySegment segment, int baseOffset, int fieldOffset, int precision, int scale) { + if (Decimal.isCompact(precision)) { + return Decimal.fromUnscaledLong( + segment.getLong(baseOffset + fieldOffset), precision, scale); + } + + final long offsetAndSize = segment.getLong(baseOffset + fieldOffset); + return BinarySegmentUtils.readDecimal( + new MemorySegment[] {segment}, 0, offsetAndSize, precision, scale); + } + + private TimestampNtz readTimestampNtz( + MemorySegment segment, int baseOffset, int fieldOffset, int precision) { + if (TimestampNtz.isCompact(precision)) { + return TimestampNtz.fromMillis(segment.getLong(baseOffset + fieldOffset)); + } + + final long offsetAndNanoOfMilli = segment.getLong(baseOffset + fieldOffset); + final int nanoOfMillisecond = (int) offsetAndNanoOfMilli; + final int subOffset = (int) (offsetAndNanoOfMilli >> 32); + final long millisecond = segment.getLong(subOffset); + return TimestampNtz.fromMillis(millisecond, nanoOfMillisecond); + } + + private TimestampLtz readTimestampLtz( + MemorySegment segment, int baseOffset, int fieldOffset, int precision) { + if (TimestampLtz.isCompact(precision)) { + return TimestampLtz.fromEpochMillis(segment.getLong(baseOffset + fieldOffset)); + } + + final long offsetAndNanoOfMilli = segment.getLong(baseOffset + fieldOffset); + final int nanoOfMillisecond = (int) offsetAndNanoOfMilli; + final int subOffset = (int) (offsetAndNanoOfMilli >> 32); + final long millisecond = segment.getLong(subOffset); + return TimestampLtz.fromEpochMillis(millisecond, nanoOfMillisecond); + } + + /** Accessor for reading fields from Paimon binary format. */ + interface FieldReader { + Object readField(MemorySegment segment, int baseOffset); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/row/encode/paimon/PaimonBinaryRowWriter.java b/fluss-common/src/main/java/org/apache/fluss/row/encode/paimon/PaimonBinaryRowWriter.java index d4f098f9b..80bc37121 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/encode/paimon/PaimonBinaryRowWriter.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/encode/paimon/PaimonBinaryRowWriter.java @@ -227,6 +227,7 @@ class PaimonBinaryRowWriter { segment.putLong(cursor, value.getEpochMillisecond()); setOffsetAndSize(pos, cursor, value.getNanoOfMillisecond()); } + cursor += 8; } } diff --git a/fluss-common/src/test/java/org/apache/fluss/record/KeyRecordBatchTest.java b/fluss-common/src/test/java/org/apache/fluss/record/KeyRecordBatchTest.java new file mode 100644 index 000000000..88d0ada50 --- /dev/null +++ b/fluss-common/src/test/java/org/apache/fluss/record/KeyRecordBatchTest.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.record; + +import org.apache.fluss.metadata.KvFormat; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.row.BinaryRow; +import org.apache.fluss.row.encode.CompactedKeyEncoder; +import org.apache.fluss.types.DataTypes; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Iterator; + +import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID; +import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link KeyRecordBatch}. */ +class KeyRecordBatchTest extends KvTestBase { + + @Test + void testSingleKeyField() { + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .column("c", DataTypes.BIGINT()) + .primaryKey("a") + .build(); + + CompactedKeyEncoder keyEncoder = + new CompactedKeyEncoder(schema.getRowType(), new int[] {0}); + byte[] key1 = keyEncoder.encodeKey(row(1)); + byte[] key2 = keyEncoder.encodeKey(row(2)); + + KeyRecordBatch batch = + new KeyRecordBatch( + Arrays.asList(key1, key2), KvFormat.COMPACTED, DEFAULT_SCHEMA_ID); + KvRecordBatch.ReadContext context = createContext(schema); + Iterator<KvRecord> iterator = batch.records(context).iterator(); + + BinaryRow row1 = iterator.next().getRow(); + assertThat(row1.getInt(0)).isEqualTo(1); + assertThat(row1.isNullAt(1)).isTrue(); + assertThat(row1.isNullAt(2)).isTrue(); + + BinaryRow row2 = iterator.next().getRow(); + assertThat(row2.getInt(0)).isEqualTo(2); + assertThat(row2.isNullAt(1)).isTrue(); + assertThat(row2.isNullAt(2)).isTrue(); + + assertThat(iterator.hasNext()).isFalse(); + } + + @Test + void testMultipleKeyFields() { + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .column("c", DataTypes.BIGINT()) + .column("d", DataTypes.DOUBLE()) + .primaryKey("a", "b", "c") + .build(); + + CompactedKeyEncoder keyEncoder = + new CompactedKeyEncoder(schema.getRowType(), new int[] {0, 1, 2}); + byte[] key1 = keyEncoder.encodeKey(row(100, "key1", 1000L)); + byte[] key2 = keyEncoder.encodeKey(row(200, "key2", 2000L)); + + KeyRecordBatch batch = + new KeyRecordBatch( + Arrays.asList(key1, key2), KvFormat.COMPACTED, DEFAULT_SCHEMA_ID); + KvRecordBatch.ReadContext context = createContext(schema); + Iterator<KvRecord> iterator = batch.records(context).iterator(); + + BinaryRow row1 = iterator.next().getRow(); + assertThat(row1.getInt(0)).isEqualTo(100); + assertThat(row1.getString(1).toString()).isEqualTo("key1"); + assertThat(row1.getLong(2)).isEqualTo(1000L); + assertThat(row1.isNullAt(3)).isTrue(); + + BinaryRow row2 = iterator.next().getRow(); + assertThat(row2.getInt(0)).isEqualTo(200); + assertThat(row2.getString(1).toString()).isEqualTo("key2"); + assertThat(row2.getLong(2)).isEqualTo(2000L); + assertThat(row2.isNullAt(3)).isTrue(); + + assertThat(iterator.hasNext()).isFalse(); + } + + @Test + void testNonSequentialKeyFields() { + // Key at end: pk=(c), fields=(a, b, c) + Schema schema1 = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .column("c", DataTypes.BIGINT()) + .primaryKey("c") + .build(); + CompactedKeyEncoder encoder1 = new CompactedKeyEncoder(schema1.getRowType(), new int[] {2}); + byte[] key1 = encoder1.encodeKey(row(schema1.getRowType(), null, null, 1000L)); + KeyRecordBatch batch1 = + new KeyRecordBatch(Arrays.asList(key1), KvFormat.COMPACTED, DEFAULT_SCHEMA_ID); + BinaryRow row1 = batch1.records(createContext(schema1)).iterator().next().getRow(); + assertThat(row1.isNullAt(0)).isTrue(); + assertThat(row1.isNullAt(1)).isTrue(); + assertThat(row1.getLong(2)).isEqualTo(1000L); + + // Non-consecutive: pk=(b, d), fields=(a, b, c, d, e) + Schema schema2 = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .column("c", DataTypes.BIGINT()) + .column("d", DataTypes.DOUBLE()) + .column("e", DataTypes.BOOLEAN()) + .primaryKey("b", "d") + .build(); + CompactedKeyEncoder encoder2 = + new CompactedKeyEncoder(schema2.getRowType(), new int[] {1, 3}); + byte[] key2 = encoder2.encodeKey(row(schema2.getRowType(), null, "key1", null, 1.5, null)); + KeyRecordBatch batch2 = + new KeyRecordBatch(Arrays.asList(key2), KvFormat.COMPACTED, DEFAULT_SCHEMA_ID); + BinaryRow row2 = batch2.records(createContext(schema2)).iterator().next().getRow(); + assertThat(row2.isNullAt(0)).isTrue(); + assertThat(row2.getString(1).toString()).isEqualTo("key1"); + assertThat(row2.isNullAt(2)).isTrue(); + assertThat(row2.getDouble(3)).isEqualTo(1.5); + assertThat(row2.isNullAt(4)).isTrue(); + + // Complex ordering: pk=(d, a, c), fields=(a, b, c, d, e, f) + Schema schema3 = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .column("c", DataTypes.BIGINT()) + .column("d", DataTypes.DOUBLE()) + .column("e", DataTypes.BOOLEAN()) + .column("f", DataTypes.FLOAT()) + .primaryKey("d", "a", "c") + .build(); + CompactedKeyEncoder encoder3 = + new CompactedKeyEncoder(schema3.getRowType(), new int[] {3, 0, 2}); + byte[] key3 = encoder3.encodeKey(row(100, null, 1000L, 1.5, null, null)); + KeyRecordBatch batch3 = + new KeyRecordBatch(Arrays.asList(key3), KvFormat.COMPACTED, DEFAULT_SCHEMA_ID); + BinaryRow row3 = batch3.records(createContext(schema3)).iterator().next().getRow(); + assertThat(row3.getInt(0)).isEqualTo(100); + assertThat(row3.isNullAt(1)).isTrue(); + assertThat(row3.getLong(2)).isEqualTo(1000L); + assertThat(row3.getDouble(3)).isEqualTo(1.5); + assertThat(row3.isNullAt(4)).isTrue(); + assertThat(row3.isNullAt(5)).isTrue(); + } + + @Test + void testEmptyBatch() { + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .primaryKey("a") + .build(); + + KeyRecordBatch batch = + new KeyRecordBatch(Arrays.asList(), KvFormat.COMPACTED, DEFAULT_SCHEMA_ID); + assertThat(batch.getRecordCount()).isEqualTo(0); + assertThat(batch.records(createContext(schema)).iterator().hasNext()).isFalse(); + } + + private KvRecordBatch.ReadContext createContext(Schema schema) { + return KvRecordReadContext.createReadContext( + KvFormat.COMPACTED, new TestingSchemaGetter(DEFAULT_SCHEMA_ID, schema)); + } +} diff --git a/fluss-common/src/test/java/org/apache/fluss/record/TestData.java b/fluss-common/src/test/java/org/apache/fluss/record/TestData.java index 874377125..b67e6443e 100644 --- a/fluss-common/src/test/java/org/apache/fluss/record/TestData.java +++ b/fluss-common/src/test/java/org/apache/fluss/record/TestData.java @@ -216,6 +216,34 @@ public final class TestData { .withComment("b is second column") .primaryKey("a") .build(); + + // DATA3 with auto-increment column for testing lookup-insert-if-not-exists + public static final RowType DATA3_ROW_TYPE = + DataTypes.ROW( + new DataField("a", DataTypes.INT()), + new DataField("b", DataTypes.STRING()), + new DataField("c", DataTypes.BIGINT())); + public static final RowType DATA3_KEY_TYPE = DataTypes.ROW(new DataField("a", DataTypes.INT())); + public static final Schema DATA3_SCHEMA_PK_AUTO_INC = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .withComment("a is primary key column") + .column("b", DataTypes.STRING()) + .withComment("b is regular column") + .column("c", DataTypes.BIGINT()) + .withComment("c is auto-increment column") + .primaryKey("a") + .enableAutoIncrement("c") + .build(); + public static final TablePath DATA3_TABLE_PATH_PK_AUTO_INC = + TablePath.of("test_db_3", "test_pk_table_auto_inc"); + public static final long DATA3_TABLE_ID_PK_AUTO_INC = 150004L; + public static final TableDescriptor DATA3_TABLE_DESCRIPTOR_PK_AUTO_INC = + TableDescriptor.builder() + .schema(DATA3_SCHEMA_PK_AUTO_INC) + .distributedBy(3, "a") + .build(); + // ---------------------------- data3 table info end ------------------------------ public static final TestingSchemaGetter TEST_SCHEMA_GETTER = diff --git a/fluss-common/src/test/java/org/apache/fluss/row/decode/CompactedKeyDecoderTest.java b/fluss-common/src/test/java/org/apache/fluss/row/decode/CompactedKeyDecoderTest.java new file mode 100644 index 000000000..529fb4c57 --- /dev/null +++ b/fluss-common/src/test/java/org/apache/fluss/row/decode/CompactedKeyDecoderTest.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.row.decode; + +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.TimestampLtz; +import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.encode.CompactedKeyEncoder; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.DataTypes; +import org.apache.fluss.types.RowType; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.BiConsumer; + +import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link CompactedKeyDecoder}. */ +class CompactedKeyDecoderTest { + + @Test + void testDecodeKey() { + // All fields as keys + verifyDecode( + RowType.of(DataTypes.INT(), DataTypes.BIGINT(), DataTypes.INT()), + new int[] {0, 1, 2}, + row(1, 3L, 2), + (decoded, original) -> { + assertThat(decoded.getFieldCount()).isEqualTo(3); + assertThat(decoded.getInt(0)).isEqualTo(1); + assertThat(decoded.getLong(1)).isEqualTo(3L); + assertThat(decoded.getInt(2)).isEqualTo(2); + }); + + // Partial key - single INT key + RowType type1 = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.STRING(), DataTypes.BIGINT()}, + new String[] {"id", "name", "value"}); + verifyDecode( + type1, + Collections.singletonList("id"), + row(100, "Alice", 999L), + (decoded, original) -> { + assertThat(decoded.getFieldCount()).isEqualTo(1); + assertThat(decoded.getInt(0)).isEqualTo(100); + }); + + // Single STRING key at non-first position + RowType type2 = + RowType.of( + new DataType[] {DataTypes.STRING(), DataTypes.BIGINT(), DataTypes.STRING()}, + new String[] {"partition", "f1", "f2"}); + verifyDecode( + type2, + Collections.singletonList("f2"), + row("p1", 1L, "a2"), + (decoded, original) -> { + assertThat(decoded.getFieldCount()).isEqualTo(1); + assertThat(decoded.getString(0).toString()).isEqualTo("a2"); + }); + + // Multiple keys (INT, STRING) + RowType type3 = + RowType.of( + new DataType[] { + DataTypes.INT(), + DataTypes.STRING(), + DataTypes.BIGINT(), + DataTypes.DOUBLE(), + DataTypes.BOOLEAN() + }, + new String[] {"id", "name", "age", "score", "active"}); + verifyDecode( + type3, + Arrays.asList("id", "name"), + row(100, "Alice", 25L, 95.5, true), + (decoded, original) -> { + assertThat(decoded.getFieldCount()).isEqualTo(2); + assertThat(decoded.getInt(0)).isEqualTo(100); + assertThat(decoded.getString(1).toString()).isEqualTo("Alice"); + }); + + // Non-sequential key positions + RowType type4 = + RowType.of( + new DataType[] { + DataTypes.STRING(), + DataTypes.INT(), + DataTypes.STRING(), + DataTypes.BIGINT(), + DataTypes.STRING() + }, + new String[] {"a", "b", "c", "d", "e"}); + verifyDecode( + type4, + Arrays.asList("b", "d"), + row("v0", 42, "v2", 999L, "v4"), + (decoded, original) -> { + assertThat(decoded.getFieldCount()).isEqualTo(2); + assertThat(decoded.getInt(0)).isEqualTo(42); + assertThat(decoded.getLong(1)).isEqualTo(999L); + }); + + // Various data types at non-sequential positions + RowType type5 = + RowType.of( + new DataType[] { + DataTypes.BOOLEAN(), + DataTypes.TINYINT(), + DataTypes.SMALLINT(), + DataTypes.INT(), + DataTypes.BIGINT(), + DataTypes.FLOAT(), + DataTypes.DOUBLE(), + DataTypes.STRING() + }, + new String[] {"a", "b", "c", "d", "e", "f", "g", "h"}); + verifyDecode( + type5, + Arrays.asList("b", "d", "g"), + row(true, (byte) 1, (short) 2, 3, 4L, 5.0f, 6.0, BinaryString.fromString("test")), + (decoded, original) -> { + assertThat(decoded.getFieldCount()).isEqualTo(3); + assertThat(decoded.getByte(0)).isEqualTo((byte) 1); + assertThat(decoded.getInt(1)).isEqualTo(3); + assertThat(decoded.getDouble(2)).isEqualTo(6.0); + }); + } + + @Test + void testEncodeDecodeRoundTrip() { + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.STRING(), DataTypes.BIGINT()}, + new String[] {"id", "name", "value"}); + List<String> pk = Arrays.asList("id", "name"); + CompactedKeyEncoder encoder = CompactedKeyEncoder.createKeyEncoder(rowType, pk); + CompactedKeyDecoder decoder = CompactedKeyDecoder.createKeyDecoder(rowType, pk); + + for (InternalRow original : + new InternalRow[] { + row(1, "alice", 100L), + row(2, "bob", 200L), + row(999, "test", 999L), + row(0, "", 0L) + }) { + InternalRow decoded = decoder.decodeKey(encoder.encodeKey(original)); + assertThat(decoded.getInt(0)).isEqualTo(original.getInt(0)); + assertThat(decoded.getString(1).toString()).isEqualTo(original.getString(1).toString()); + } + } + + @Test + void testDecodeInvalidKey() { + RowType rowType = RowType.of(DataTypes.INT(), DataTypes.STRING()); + assertThatThrownBy( + () -> + CompactedKeyDecoder.createKeyDecoder( + rowType, Collections.singletonList("invalidField"))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("invalidField not found in row type"); + } + + @Test + void testAllSupportedPrimaryKeyTypes() { + // Test all supported primary key data types + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.BOOLEAN(), + DataTypes.TINYINT(), + DataTypes.SMALLINT(), + DataTypes.INT(), + DataTypes.BIGINT(), + DataTypes.FLOAT(), + DataTypes.DOUBLE(), + DataTypes.STRING(), + DataTypes.CHAR(10), + DataTypes.BINARY(5), + DataTypes.BYTES(), + DataTypes.DATE(), + DataTypes.TIME(), + DataTypes.TIMESTAMP(), + DataTypes.TIMESTAMP_LTZ(), + DataTypes.DECIMAL(10, 2) + }, + new String[] { + "f_boolean", + "f_tinyint", + "f_smallint", + "f_int", + "f_bigint", + "f_float", + "f_double", + "f_string", + "f_char", + "f_binary", + "f_bytes", + "f_date", + "f_time", + "f_timestamp", + "f_timestamp_ltz", + "f_decimal" + }); + + List<String> allPrimaryKeys = + Arrays.asList( + "f_boolean", + "f_tinyint", + "f_smallint", + "f_int", + "f_bigint", + "f_float", + "f_double", + "f_string", + "f_char", + "f_binary", + "f_bytes", + "f_date", + "f_time", + "f_timestamp", + "f_timestamp_ltz", + "f_decimal"); + + // Test data values + InternalRow testRow = + row( + true, // BOOLEAN + (byte) 127, // TINYINT + (short) 32767, // SMALLINT + 2147483647, // INT + 9223372036854775807L, // BIGINT + 3.14f, // FLOAT + 2.718281828, // DOUBLE + BinaryString.fromString("test_string"), // STRING + BinaryString.fromString("test_char"), // CHAR + new byte[] {1, 2, 3, 4, 5}, // BINARY + new byte[] {10, 20, 30}, // BYTES + 18000, // DATE (days since epoch) + 3600000, // TIME (milliseconds since midnight) + TimestampNtz.fromMillis(1609459200000L), // TIMESTAMP + TimestampLtz.fromEpochMillis(1609459200000L), // TIMESTAMP_LTZ + Decimal.fromBigDecimal( + new java.math.BigDecimal("12345678.90"), 10, 2) // DECIMAL + ); + + verifyDecode( + rowType, + allPrimaryKeys, + testRow, + (decoded, original) -> { + assertThat(decoded.getFieldCount()).isEqualTo(16); + assertThat(decoded.getBoolean(0)).isEqualTo(true); + assertThat(decoded.getByte(1)).isEqualTo((byte) 127); + assertThat(decoded.getShort(2)).isEqualTo((short) 32767); + assertThat(decoded.getInt(3)).isEqualTo(2147483647); + assertThat(decoded.getLong(4)).isEqualTo(9223372036854775807L); + assertThat(decoded.getFloat(5)).isEqualTo(3.14f); + assertThat(decoded.getDouble(6)).isEqualTo(2.718281828); + assertThat(decoded.getString(7).toString()).isEqualTo("test_string"); + assertThat(decoded.getChar(8, 10).toString()).isEqualTo("test_char"); + assertThat(decoded.getBinary(9, 5)).isEqualTo(new byte[] {1, 2, 3, 4, 5}); + assertThat(decoded.getBytes(10)).isEqualTo(new byte[] {10, 20, 30}); + assertThat(decoded.getInt(11)).isEqualTo(18000); + assertThat(decoded.getInt(12)).isEqualTo(3600000); + assertThat(decoded.getTimestampNtz(13, 6).getMillisecond()) + .isEqualTo(1609459200000L); + assertThat(decoded.getTimestampLtz(14, 6).getEpochMillisecond()) + .isEqualTo(1609459200000L); + assertThat(decoded.getDecimal(15, 10, 2).toBigDecimal()) + .isEqualTo(new java.math.BigDecimal("12345678.90")); + }); + } + + private void verifyDecode( + RowType rowType, + int[] keyPos, + InternalRow original, + BiConsumer<InternalRow, InternalRow> assertions) { + CompactedKeyEncoder encoder = new CompactedKeyEncoder(rowType, keyPos); + CompactedKeyDecoder decoder = new CompactedKeyDecoder(rowType, keyPos); + assertions.accept(decoder.decodeKey(encoder.encodeKey(original)), original); + } + + private void verifyDecode( + RowType rowType, + List<String> keys, + InternalRow original, + BiConsumer<InternalRow, InternalRow> assertions) { + CompactedKeyEncoder encoder = CompactedKeyEncoder.createKeyEncoder(rowType, keys); + CompactedKeyDecoder decoder = CompactedKeyDecoder.createKeyDecoder(rowType, keys); + assertions.accept(decoder.decodeKey(encoder.encodeKey(original)), original); + } +} diff --git a/fluss-common/src/test/java/org/apache/fluss/row/decode/iceberg/IcebergKeyDecoderTest.java b/fluss-common/src/test/java/org/apache/fluss/row/decode/iceberg/IcebergKeyDecoderTest.java new file mode 100644 index 000000000..49b384375 --- /dev/null +++ b/fluss-common/src/test/java/org/apache/fluss/row/decode/iceberg/IcebergKeyDecoderTest.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.row.decode.iceberg; + +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.encode.iceberg.IcebergKeyEncoder; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.DataTypes; +import org.apache.fluss.types.RowType; + +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.Collections; + +import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Unit tests for {@link IcebergKeyDecoder} to verify decoding correctness and round-trip with + * encoder. + */ +class IcebergKeyDecoderTest { + + @Test + void testSingleKeyFieldRequirement() { + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.STRING()}, + new String[] {"id", "name"}); + + // Should succeed with single key + IcebergKeyDecoder decoder = new IcebergKeyDecoder(rowType, Collections.singletonList("id")); + assertThat(decoder).isNotNull(); + + // Should fail with multiple keys + assertThatThrownBy(() -> new IcebergKeyDecoder(rowType, Arrays.asList("id", "name"))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Key fields must have exactly one field"); + } + + @Test + void testDecodeInteger() { + RowType rowType = RowType.of(new DataType[] {DataTypes.INT()}, new String[] {"id"}); + IcebergKeyEncoder encoder = new IcebergKeyEncoder(rowType, Collections.singletonList("id")); + IcebergKeyDecoder decoder = new IcebergKeyDecoder(rowType, Collections.singletonList("id")); + + for (int value : new int[] {0, -42, 42, Integer.MAX_VALUE, Integer.MIN_VALUE}) { + InternalRow original = row(value); + byte[] encoded = encoder.encodeKey(original); + InternalRow decoded = decoder.decodeKey(encoded); + assertThat(decoded.getInt(0)).isEqualTo(value); + } + } + + @Test + void testDecodeLong() { + RowType rowType = RowType.of(new DataType[] {DataTypes.BIGINT()}, new String[] {"id"}); + IcebergKeyEncoder encoder = new IcebergKeyEncoder(rowType, Collections.singletonList("id")); + IcebergKeyDecoder decoder = new IcebergKeyDecoder(rowType, Collections.singletonList("id")); + + for (long value : new long[] {0L, -999L, 1234567890123456789L, Long.MAX_VALUE}) { + InternalRow original = row(value); + byte[] encoded = encoder.encodeKey(original); + InternalRow decoded = decoder.decodeKey(encoded); + assertThat(decoded.getLong(0)).isEqualTo(value); + } + } + + @Test + void testDecodeDate() { + RowType rowType = RowType.of(new DataType[] {DataTypes.DATE()}, new String[] {"date"}); + IcebergKeyEncoder encoder = + new IcebergKeyEncoder(rowType, Collections.singletonList("date")); + IcebergKeyDecoder decoder = + new IcebergKeyDecoder(rowType, Collections.singletonList("date")); + + for (int value : new int[] {0, 19655}) { + InternalRow original = row(value); + byte[] encoded = encoder.encodeKey(original); + InternalRow decoded = decoder.decodeKey(encoded); + assertThat(decoded.getInt(0)).isEqualTo(value); + } + } + + @Test + void testDecodeTime() { + RowType rowType = RowType.of(new DataType[] {DataTypes.TIME()}, new String[] {"time"}); + IcebergKeyEncoder encoder = + new IcebergKeyEncoder(rowType, Collections.singletonList("time")); + IcebergKeyDecoder decoder = + new IcebergKeyDecoder(rowType, Collections.singletonList("time")); + + for (int value : new int[] {0, 34200000, 86399999}) { + InternalRow original = row(value); + byte[] encoded = encoder.encodeKey(original); + InternalRow decoded = decoder.decodeKey(encoded); + assertThat(decoded.getInt(0)).isEqualTo(value); + } + } + + @Test + void testDecodeTimestamp() { + RowType rowType = RowType.of(new DataType[] {DataTypes.TIMESTAMP(6)}, new String[] {"ts"}); + IcebergKeyEncoder encoder = new IcebergKeyEncoder(rowType, Collections.singletonList("ts")); + IcebergKeyDecoder decoder = new IcebergKeyDecoder(rowType, Collections.singletonList("ts")); + + // Iceberg uses microsecond precision, so only test values that are multiples of 1000 nanos + long[] millisValues = {0L, 1000L, 1698235273182L}; + int[] nanosValues = { + 0, 0, 123000, 999000 + }; // Must be multiples of 1000 for microsecond precision + + for (int i = 0; i < millisValues.length; i++) { + InternalRow original = + row((Object) TimestampNtz.fromMillis(millisValues[i], nanosValues[i])); + byte[] encoded = encoder.encodeKey(original); + InternalRow decoded = decoder.decodeKey(encoded); + TimestampNtz ts = decoded.getTimestampNtz(0, 6); + assertThat(ts.getMillisecond()).isEqualTo(millisValues[i]); + assertThat(ts.getNanoOfMillisecond()).isEqualTo(nanosValues[i]); + } + } + + @Test + void testDecodeString() { + RowType rowType = RowType.of(new DataType[] {DataTypes.STRING()}, new String[] {"name"}); + IcebergKeyEncoder encoder = + new IcebergKeyEncoder(rowType, Collections.singletonList("name")); + IcebergKeyDecoder decoder = + new IcebergKeyDecoder(rowType, Collections.singletonList("name")); + + for (String value : new String[] {"", "a", "Hello", "Hello Iceberg!", "UTF-8: 你好世界"}) { + InternalRow original = row((Object) BinaryString.fromString(value)); + byte[] encoded = encoder.encodeKey(original); + InternalRow decoded = decoder.decodeKey(encoded); + assertThat(decoded.getString(0).toString()).isEqualTo(value); + } + } + + @Test + void testDecodeBinary() { + RowType rowType = RowType.of(new DataType[] {DataTypes.BYTES()}, new String[] {"data"}); + IcebergKeyEncoder encoder = + new IcebergKeyEncoder(rowType, Collections.singletonList("data")); + IcebergKeyDecoder decoder = + new IcebergKeyDecoder(rowType, Collections.singletonList("data")); + + for (byte[] value : + new byte[][] {new byte[0], "test".getBytes(), new byte[] {0, 1, 127, -128, -1}}) { + InternalRow original = row((Object) value); + byte[] encoded = encoder.encodeKey(original); + InternalRow decoded = decoder.decodeKey(encoded); + assertThat(decoded.getBytes(0)).isEqualTo(value); + } + } + + @Test + void testDecodeDecimal() { + RowType rowType = + RowType.of(new DataType[] {DataTypes.DECIMAL(10, 2)}, new String[] {"amount"}); + IcebergKeyEncoder encoder = + new IcebergKeyEncoder(rowType, Collections.singletonList("amount")); + IcebergKeyDecoder decoder = + new IcebergKeyDecoder(rowType, Collections.singletonList("amount")); + + for (String decimalStr : new String[] {"0.00", "123.45", "-999.99", "12345678.90"}) { + BigDecimal value = new BigDecimal(decimalStr); + Decimal decimal = Decimal.fromBigDecimal(value, 10, 2); + InternalRow original = row((Object) decimal); + byte[] encoded = encoder.encodeKey(original); + InternalRow decoded = decoder.decodeKey(encoded); + assertThat(decoded.getDecimal(0, 10, 2).toBigDecimal()).isEqualTo(value); + } + + // Test high precision decimal + RowType highPrecisionRowType = + RowType.of(new DataType[] {DataTypes.DECIMAL(38, 10)}, new String[] {"big_amount"}); + IcebergKeyEncoder highPrecisionEncoder = + new IcebergKeyEncoder( + highPrecisionRowType, Collections.singletonList("big_amount")); + IcebergKeyDecoder highPrecisionDecoder = + new IcebergKeyDecoder( + highPrecisionRowType, Collections.singletonList("big_amount")); + BigDecimal highValue = new BigDecimal("1234567890123456789012345678.1234567890"); + Decimal highDecimal = Decimal.fromBigDecimal(highValue, 38, 10); + InternalRow highOriginal = row((Object) highDecimal); + byte[] encoded = highPrecisionEncoder.encodeKey(highOriginal); + InternalRow decoded = highPrecisionDecoder.decodeKey(encoded); + assertThat(decoded.getDecimal(0, 38, 10).toBigDecimal()).isEqualTo(highValue); + } +} diff --git a/fluss-common/src/test/java/org/apache/fluss/row/decode/paimon/PaimonKeyDecoderTest.java b/fluss-common/src/test/java/org/apache/fluss/row/decode/paimon/PaimonKeyDecoderTest.java new file mode 100644 index 000000000..6066d827f --- /dev/null +++ b/fluss-common/src/test/java/org/apache/fluss/row/decode/paimon/PaimonKeyDecoderTest.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.row.decode.paimon; + +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.TimestampLtz; +import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.encode.paimon.PaimonKeyEncoder; +import org.apache.fluss.row.indexed.IndexedRow; +import org.apache.fluss.row.indexed.IndexedRowWriter; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.DataTypes; +import org.apache.fluss.types.RowType; + +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit tests for {@link PaimonKeyDecoder} to verify decoding correctness and round-trip. */ +class PaimonKeyDecoderTest { + + @Test + void testDecodeAllTypes() { + // Create a row type with all supported types as primary key fields + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.BOOLEAN(), + DataTypes.TINYINT(), + DataTypes.SMALLINT(), + DataTypes.INT(), + DataTypes.BIGINT(), + DataTypes.FLOAT(), + DataTypes.DOUBLE(), + DataTypes.DATE(), + DataTypes.TIME(), + DataTypes.CHAR(5), + DataTypes.STRING(), + DataTypes.BINARY(10), + DataTypes.BYTES(), + DataTypes.DECIMAL(5, 2), + DataTypes.DECIMAL(20, 0), + DataTypes.TIMESTAMP(1), + DataTypes.TIMESTAMP(6), + DataTypes.TIMESTAMP_LTZ(1), + DataTypes.TIMESTAMP_LTZ(6) + }, + new String[] { + "bool_col", + "byte_col", + "short_col", + "int_col", + "long_col", + "float_col", + "double_col", + "date_col", + "time_col", + "char_col", + "string_col", + "binary_col", + "bytes_col", + "compact_decimal_col", + "non_compact_decimal_col", + "compact_ts_col", + "non_compact_ts_col", + "compact_ltz_col", + "non_compact_ltz_col" + }); + + List<String> keys = rowType.getFieldNames(); + PaimonKeyEncoder encoder = new PaimonKeyEncoder(rowType, keys); + PaimonKeyDecoder decoder = new PaimonKeyDecoder(rowType, keys); + + long millis = 1698235273182L; + int nanos = 456789; + + // Create test row with all types + InternalRow original = createAllTypesRow(millis, nanos); + + // Encode and decode + byte[] encoded = encoder.encodeKey(original); + InternalRow decoded = decoder.decodeKey(encoded); + + // Verify all fields + assertThat(decoded.getFieldCount()).isEqualTo(19); + assertThat(decoded.getBoolean(0)).isTrue(); + assertThat(decoded.getByte(1)).isEqualTo((byte) 127); + assertThat(decoded.getShort(2)).isEqualTo((short) 32767); + assertThat(decoded.getInt(3)).isEqualTo(Integer.MAX_VALUE); + assertThat(decoded.getLong(4)).isEqualTo(Long.MAX_VALUE); + assertThat(decoded.getFloat(5)) + .isCloseTo(3.14f, org.assertj.core.data.Offset.offset(0.01f)); + assertThat(decoded.getDouble(6)) + .isCloseTo(2.718, org.assertj.core.data.Offset.offset(0.001)); + assertThat(decoded.getInt(7)).isEqualTo(19655); + assertThat(decoded.getInt(8)).isEqualTo(34200000); + assertThat(decoded.getString(9).toString()).isEqualTo("hello"); + assertThat(decoded.getString(10).toString()).isEqualTo("world"); + assertThat(decoded.getBytes(11)).isEqualTo("test".getBytes()); + assertThat(decoded.getBytes(12)).isEqualTo("data".getBytes()); + assertThat(decoded.getDecimal(13, 5, 2).toBigDecimal()).isEqualTo(new BigDecimal("123.45")); + assertThat(decoded.getDecimal(14, 20, 0).toBigDecimal()) + .isEqualTo(new BigDecimal("12345678901234567890")); + assertThat(decoded.getTimestampNtz(15, 1).getMillisecond()).isEqualTo(millis); + TimestampNtz decodedTs = decoded.getTimestampNtz(16, 6); + assertThat(decodedTs.getMillisecond()).isEqualTo(millis); + assertThat(decodedTs.getNanoOfMillisecond()).isEqualTo(nanos); + assertThat(decoded.getTimestampLtz(17, 1).getEpochMillisecond()).isEqualTo(millis); + TimestampLtz decodedLtz = decoded.getTimestampLtz(18, 6); + assertThat(decodedLtz.getEpochMillisecond()).isEqualTo(millis); + assertThat(decodedLtz.getNanoOfMillisecond()).isEqualTo(nanos); + } + + @Test + void testDecodeStringVariants() { + RowType rowType = RowType.of(new DataType[] {DataTypes.STRING()}, new String[] {"s"}); + PaimonKeyEncoder encoder = new PaimonKeyEncoder(rowType, rowType.getFieldNames()); + PaimonKeyDecoder decoder = new PaimonKeyDecoder(rowType, rowType.getFieldNames()); + + // Test short strings (≤7 bytes with 0x80 marker) and long strings (>7 bytes) + for (String testStr : Arrays.asList("", "a", "1234567", "12345678", "Hello, Paimon!")) { + InternalRow original = createStringRow(testStr); + byte[] encoded = encoder.encodeKey(original); + InternalRow decoded = decoder.decodeKey(encoded); + assertThat(decoded.getString(0).toString()).isEqualTo(testStr); + } + } + + @Test + void testDecodePartialKeys() { + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.STRING(), DataTypes.BIGINT()}, + new String[] {"id", "name", "value"}); + + List<String> keys = Collections.singletonList("id"); + PaimonKeyEncoder encoder = new PaimonKeyEncoder(rowType, keys); + PaimonKeyDecoder decoder = new PaimonKeyDecoder(rowType, keys); + + InternalRow original = createPartialKeyRow(); + byte[] encoded = encoder.encodeKey(original); + InternalRow decoded = decoder.decodeKey(encoded); + + assertThat(decoded.getFieldCount()).isEqualTo(1); + assertThat(decoded.getInt(0)).isEqualTo(100); + } + + private InternalRow createAllTypesRow(long millis, int nanos) { + DataType[] dataTypes = + new DataType[] { + DataTypes.BOOLEAN(), + DataTypes.TINYINT(), + DataTypes.SMALLINT(), + DataTypes.INT(), + DataTypes.BIGINT(), + DataTypes.FLOAT(), + DataTypes.DOUBLE(), + DataTypes.DATE(), + DataTypes.TIME(), + DataTypes.CHAR(5), + DataTypes.STRING(), + DataTypes.BINARY(10), + DataTypes.BYTES(), + DataTypes.DECIMAL(5, 2), + DataTypes.DECIMAL(20, 0), + DataTypes.TIMESTAMP(1), + DataTypes.TIMESTAMP(6), + DataTypes.TIMESTAMP_LTZ(1), + DataTypes.TIMESTAMP_LTZ(6) + }; + IndexedRow indexedRow = new IndexedRow(dataTypes); + IndexedRowWriter writer = new IndexedRowWriter(dataTypes); + writer.writeBoolean(true); + writer.writeByte((byte) 127); + writer.writeShort((short) 32767); + writer.writeInt(Integer.MAX_VALUE); + writer.writeLong(Long.MAX_VALUE); + writer.writeFloat(3.14f); + writer.writeDouble(2.718); + writer.writeInt(19655); + writer.writeInt(34200000); + writer.writeChar(BinaryString.fromString("hello"), 5); + writer.writeString(BinaryString.fromString("world")); + writer.writeBinary("test".getBytes(), 10); + writer.writeBytes("data".getBytes()); + writer.writeDecimal(Decimal.fromBigDecimal(new BigDecimal("123.45"), 5, 2), 5); + writer.writeDecimal( + Decimal.fromBigDecimal(new BigDecimal("12345678901234567890"), 20, 0), 20); + writer.writeTimestampNtz(TimestampNtz.fromMillis(millis), 1); + writer.writeTimestampNtz(TimestampNtz.fromMillis(millis, nanos), 6); + writer.writeTimestampLtz(TimestampLtz.fromEpochMillis(millis), 1); + writer.writeTimestampLtz(TimestampLtz.fromEpochMillis(millis, nanos), 6); + indexedRow.pointTo(writer.segment(), 0, writer.position()); + return indexedRow; + } + + private InternalRow createStringRow(String value) { + return row((Object) BinaryString.fromString(value)); + } + + private InternalRow createPartialKeyRow() { + return row(100, BinaryString.fromString("Alice"), 999L); + } +} diff --git a/fluss-common/src/test/java/org/apache/fluss/row/encode/paimon/PaimonKeyEncoderTest.java b/fluss-common/src/test/java/org/apache/fluss/row/encode/paimon/PaimonKeyEncoderTest.java index c1093c97f..e770d96de 100644 --- a/fluss-common/src/test/java/org/apache/fluss/row/encode/paimon/PaimonKeyEncoderTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/row/encode/paimon/PaimonKeyEncoderTest.java @@ -84,7 +84,7 @@ class PaimonKeyEncoderTest { writer.writeTimestampNtz(TimestampNtz.fromMillis(1698235273182L), 1); writer.writeTimestampNtz(TimestampNtz.fromMillis(1698235273182L), 5); writer.writeTimestampLtz(TimestampLtz.fromEpochMillis(1698235273182L, 45678), 1); - writer.setNullAt(18); + writer.writeTimestampLtz(TimestampLtz.fromEpochMillis(1698235273182L, 123456), 6); indexedRow.pointTo(writer.segment(), 0, writer.position()); return indexedRow; } @@ -118,7 +118,7 @@ class PaimonKeyEncoderTest { binaryRowWriter.writeTimestamp(15, Timestamp.fromEpochMillis(1698235273182L), 1); binaryRowWriter.writeTimestamp(16, Timestamp.fromEpochMillis(1698235273182L), 5); binaryRowWriter.writeTimestamp(17, Timestamp.fromEpochMillis(1698235273182L, 45678), 1); - binaryRowWriter.setNullAt(18); + binaryRowWriter.writeTimestamp(18, Timestamp.fromEpochMillis(1698235273182L, 123456), 6); binaryRowWriter.complete(); return binaryRow; } diff --git a/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java b/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java index 8594cf224..5ee50776a 100644 --- a/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java +++ b/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java @@ -750,6 +750,20 @@ public class DataTestUtils { assertLogRecordsEquals(DEFAULT_SCHEMA_ID, rowType, logRecords, expectedValue, schemaGetter); } + public static void assertLogRecordsEquals( + RowType rowType, + LogRecords logRecords, + List<Object[]> expectedValue, + ChangeType expectedChangeType, + SchemaGetter schemaGetter) { + List<Tuple2<ChangeType, Object[]>> expectedValueWithRowKind = + expectedValue.stream() + .map(val -> Tuple2.of(expectedChangeType, val)) + .collect(Collectors.toList()); + assertLogRecordsEqualsWithRowKind( + DEFAULT_SCHEMA_ID, rowType, logRecords, expectedValueWithRowKind, schemaGetter); + } + public static void assertLogRecordsEquals( int schemaId, RowType rowType, diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index dc05efd83..b85487a50 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -246,6 +246,9 @@ message PutKvResponse { message LookupRequest { required int64 table_id = 1; repeated PbLookupReqForBucket buckets_req = 2; + optional bool insert_if_not_exists = 3; + optional int32 acks = 4; + optional int32 timeout_ms = 5; } message LookupResponse { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java index 0eee04c6c..0c711f4f3 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java @@ -328,6 +328,12 @@ public final class KvTablet { short latestSchemaId = (short) schemaInfo.getSchemaId(); validateSchemaId(kvRecords.schemaId(), latestSchemaId); + AutoIncrementUpdater currentAutoIncrementUpdater = + autoIncrementManager.getUpdaterForSchema(kvFormat, latestSchemaId); + + // Validate targetColumns doesn't contain auto-increment column + validateTargetColumns(targetColumns, currentAutoIncrementUpdater, latestSchema); + // Determine the row merger based on mergeMode: // - DEFAULT: Use the configured merge engine (rowMerger) // - OVERWRITE: Bypass merge engine, use pre-created overwriteRowMerger @@ -339,8 +345,6 @@ public final class KvTablet { targetColumns, latestSchemaId, latestSchema) : rowMerger.configureTargetColumns( targetColumns, latestSchemaId, latestSchema); - AutoIncrementUpdater currentAutoIncrementUpdater = - autoIncrementManager.getUpdaterForSchema(kvFormat, latestSchemaId); RowType latestRowType = latestSchema.getRowType(); WalBuilder walBuilder = createWalBuilder(latestSchemaId, latestRowType); @@ -407,6 +411,21 @@ public final class KvTablet { } } + private void validateTargetColumns( + int[] targetColumns, AutoIncrementUpdater autoIncrementUpdater, Schema schema) { + if (!autoIncrementUpdater.hasAutoIncrement() || targetColumns == null) { + return; + } + List<String> autoIncrementColumnNames = schema.getAutoIncrementColumnNames(); + for (int colIdx : targetColumns) { + if (autoIncrementColumnNames.contains(schema.getColumnName(colIdx))) { + throw new IllegalArgumentException( + "targetColumns must not include auto-increment column name: " + + schema.getColumnName(colIdx)); + } + } + } + private void processKvRecords( KvRecordBatch kvRecords, short schemaIdOfNewData, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index a9bfde8e2..d1010918e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -34,11 +34,13 @@ import org.apache.fluss.exception.UnsupportedVersionException; import org.apache.fluss.fs.FsPath; import org.apache.fluss.metadata.LogFormat; import org.apache.fluss.metadata.PhysicalTablePath; +import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.metrics.MetricNames; import org.apache.fluss.metrics.groups.MetricGroup; +import org.apache.fluss.record.KeyRecordBatch; import org.apache.fluss.record.KvRecordBatch; import org.apache.fluss.record.MemoryLogRecords; import org.apache.fluss.record.ProjectionPushdownCache; @@ -123,6 +125,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -138,6 +141,7 @@ import java.util.stream.Stream; import static org.apache.fluss.config.ConfigOptions.KV_FORMAT_VERSION_2; import static org.apache.fluss.server.TabletManagerBase.getTableInfo; import static org.apache.fluss.utils.FileUtils.isDirectoryEmpty; +import static org.apache.fluss.utils.Preconditions.checkArgument; import static org.apache.fluss.utils.Preconditions.checkState; import static org.apache.fluss.utils.concurrent.LockUtils.inLock; @@ -580,6 +584,53 @@ public class ReplicaManager { timeoutMs, requiredAcks, entriesPerBucket.size(), kvPutResult, responseCallback); } + /** Context for tracking missing keys that need to be inserted. */ + public static class MissingKeysContext { + final List<Integer> missingIndexes; + final List<byte[]> missingKeys; + + MissingKeysContext(List<Integer> missingIndexes, List<byte[]> missingKeys) { + this.missingIndexes = missingIndexes; + this.missingKeys = missingKeys; + } + } + + /** + * Collect missing keys from lookup results for insertion, populating both context and batch + * maps. + */ + private void collectMissingKeysForInsert( + Map<TableBucket, List<byte[]>> entriesPerBucket, + Map<TableBucket, LookupResultForBucket> lookupResults, + Map<TableBucket, MissingKeysContext> missingKeysContextMap, + Map<TableBucket, KvRecordBatch> kvRecordBatchMap) { + for (Map.Entry<TableBucket, List<byte[]>> entry : entriesPerBucket.entrySet()) { + TableBucket tb = entry.getKey(); + LookupResultForBucket lookupResult = lookupResults.get(tb); + if (lookupResult.failed()) { + continue; + } + List<Integer> missingIndexes = new ArrayList<>(); + List<byte[]> missingKeys = new ArrayList<>(); + List<byte[]> requestedKeys = entry.getValue(); + List<byte[]> lookupValues = lookupResult.lookupValues(); + + for (int i = 0; i < requestedKeys.size(); i++) { + if (lookupValues.get(i) == null) { + missingKeys.add(requestedKeys.get(i)); + missingIndexes.add(i); + } + } + if (!missingKeys.isEmpty()) { + missingKeysContextMap.put(tb, new MissingKeysContext(missingIndexes, missingKeys)); + kvRecordBatchMap.put( + tb, + KeyRecordBatch.create( + missingKeys, getReplicaOrException(tb).getTableInfo())); + } + } + } + /** Lookup a single key value. */ @VisibleForTesting protected void lookup(TableBucket tableBucket, byte[] key, Consumer<byte[]> responseCallback) { @@ -598,12 +649,22 @@ public class ReplicaManager { }); } + public void lookups( + Map<TableBucket, List<byte[]>> entriesPerBucket, + short apiVersion, + Consumer<Map<TableBucket, LookupResultForBucket>> responseCallback) { + lookups(false, null, null, entriesPerBucket, apiVersion, responseCallback); + } + /** * Lookup with multi key from leader replica of the buckets. * * @param apiVersion the client API version for backward compatibility validation */ public void lookups( + boolean insertIfNotExists, + @Nullable Integer timeoutMs, + @Nullable Integer requiredAcks, Map<TableBucket, List<byte[]>> entriesPerBucket, short apiVersion, Consumer<Map<TableBucket, LookupResultForBucket>> responseCallback) { @@ -633,8 +694,96 @@ public class ReplicaManager { tb, new LookupResultForBucket(tb, ApiError.fromThrowable(e))); } } + + if (insertIfNotExists) { + // Lookup-with-insert-if-not-exists flow: + // 1. Initial lookup may find some keys missing + // 2. For missing keys, we call putKv to insert them + // 3. Concurrent lookups on the same keys may all see them as missing, leading to + // multiple putKv calls for the same keys (becomes UPDATE operations in + // processUpsert) + // 4. Auto-increment columns are excluded from targetColumns to prevent overwriting + checkArgument( + timeoutMs != null && requiredAcks != null, + "timeoutMs and requiredAcks must be set"); + Map<TableBucket, MissingKeysContext> entriesPerBucketToInsert = new HashMap<>(); + Map<TableBucket, KvRecordBatch> produceEntryData = new HashMap<>(); + collectMissingKeysForInsert( + entriesPerBucket, + lookupResultForBucketMap, + entriesPerBucketToInsert, + produceEntryData); + + if (!produceEntryData.isEmpty()) { + // Compute target columns: exclude auto-increment to prevent overwriting + TableBucket firstBucket = produceEntryData.keySet().iterator().next(); + Schema schema = getReplicaOrException(firstBucket).getTableInfo().getSchema(); + + // TODO: Performance optimization: during lookup-with-insert-if-not-exists flow, + // the original key bytes are wrapped in KeyRecordBatch, then during putRecordsToKv + // they are decoded to rows and immediately re-encoded back to key bytes, causing + // redundant encode/decode overhead. + putRecordsToKv( + timeoutMs, + requiredAcks, + produceEntryData, + schema.getPrimaryKeyIndexes(), + MergeMode.DEFAULT, + apiVersion, + (result) -> + responseCallback.accept( + reLookupAndMerge( + result, + entriesPerBucketToInsert, + lookupResultForBucketMap))); + } else { + // All keys exist, directly return lookup results + responseCallback.accept(lookupResultForBucketMap); + } + } else { + responseCallback.accept(lookupResultForBucketMap); + } LOG.debug("Lookup from local kv in {}ms", System.currentTimeMillis() - startTime); - responseCallback.accept(lookupResultForBucketMap); + } + + /** + * Re-lookup missing keys after insert and merge results back into the original lookup result + * map. + */ + private Map<TableBucket, LookupResultForBucket> reLookupAndMerge( + List<PutKvResultForBucket> putKvResultForBucketList, + Map<TableBucket, MissingKeysContext> entriesPerBucketToInsert, + Map<TableBucket, LookupResultForBucket> lookupResultForBucketMap) { + // Collect failed buckets first to avoid unnecessary re-lookup + Set<TableBucket> failedBuckets = new HashSet<>(); + for (PutKvResultForBucket putKvResultForBucket : putKvResultForBucketList) { + if (putKvResultForBucket.failed()) { + TableBucket tb = putKvResultForBucket.getTableBucket(); + failedBuckets.add(tb); + lookupResultForBucketMap.put( + tb, new LookupResultForBucket(tb, putKvResultForBucket.getError())); + } + } + + // Re-lookup only for successful inserts + for (Map.Entry<TableBucket, MissingKeysContext> entry : + entriesPerBucketToInsert.entrySet()) { + TableBucket tb = entry.getKey(); + if (failedBuckets.contains(tb)) { + continue; // Skip buckets that failed during insert + } + MissingKeysContext missingKeysContext = entry.getValue(); + List<byte[]> results = + getReplicaOrException(tb).lookups(missingKeysContext.missingKeys); + LookupResultForBucket lookupResult = lookupResultForBucketMap.get(tb); + for (int i = 0; i < missingKeysContext.missingIndexes.size(); i++) { + lookupResult + .lookupValues() + .set(missingKeysContext.missingIndexes.get(i), results.get(i)); + } + } + + return lookupResultForBucketMap; } /** diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java index 145cd1f0f..df2ab926f 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java @@ -246,18 +246,29 @@ public final class TabletService extends RpcServiceBase implements TabletServerG public CompletableFuture<LookupResponse> lookup(LookupRequest request) { Map<TableBucket, List<byte[]>> lookupData = toLookupData(request); Map<TableBucket, LookupResultForBucket> errorResponseMap = new HashMap<>(); - Map<TableBucket, List<byte[]>> interesting = - authorizeRequestData( - READ, lookupData, errorResponseMap, LookupResultForBucket::new); - if (interesting.isEmpty()) { - return CompletableFuture.completedFuture(makeLookupResponse(errorResponseMap)); - } - CompletableFuture<LookupResponse> response = new CompletableFuture<>(); - replicaManager.lookups( - lookupData, - currentSession().getApiVersion(), - value -> response.complete(makeLookupResponse(value, errorResponseMap))); + + if (request.hasInsertIfNotExists() && request.isInsertIfNotExists()) { + authorizeTable(WRITE, request.getTableId()); + replicaManager.lookups( + request.isInsertIfNotExists(), + request.getTimeoutMs(), + request.getAcks(), + lookupData, + currentSession().getApiVersion(), + value -> response.complete(makeLookupResponse(value, errorResponseMap))); + } else { + Map<TableBucket, List<byte[]>> interesting = + authorizeRequestData( + READ, lookupData, errorResponseMap, LookupResultForBucket::new); + if (interesting.isEmpty()) { + return CompletableFuture.completedFuture(makeLookupResponse(errorResponseMap)); + } + replicaManager.lookups( + lookupData, + currentSession().getApiVersion(), + value -> response.complete(makeLookupResponse(value, errorResponseMap))); + } return response; } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java index fc8a4a054..8472964ab 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java @@ -26,6 +26,7 @@ import org.apache.fluss.exception.InvalidRequiredAcksException; import org.apache.fluss.exception.NotLeaderOrFollowerException; import org.apache.fluss.exception.UnknownTableOrBucketException; import org.apache.fluss.metadata.DataLakeFormat; +import org.apache.fluss.metadata.KvFormat; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.SchemaGetter; @@ -41,7 +42,10 @@ import org.apache.fluss.record.LogRecordBatch; import org.apache.fluss.record.LogRecordReadContext; import org.apache.fluss.record.LogRecords; import org.apache.fluss.record.MemoryLogRecords; +import org.apache.fluss.record.TestingSchemaGetter; +import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.encode.CompactedKeyEncoder; +import org.apache.fluss.row.encode.ValueDecoder; import org.apache.fluss.row.encode.ValueEncoder; import org.apache.fluss.rpc.entity.FetchLogResultForBucket; import org.apache.fluss.rpc.entity.LimitScanResultForBucket; @@ -92,8 +96,12 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static org.apache.fluss.config.ConfigOptions.KV_FORMAT_VERSION_2; @@ -110,6 +118,10 @@ import static org.apache.fluss.record.TestData.DATA1_TABLE_ID; import static org.apache.fluss.record.TestData.DATA1_TABLE_ID_PK; import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH; import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK; +import static org.apache.fluss.record.TestData.DATA3_ROW_TYPE; +import static org.apache.fluss.record.TestData.DATA3_SCHEMA_PK_AUTO_INC; +import static org.apache.fluss.record.TestData.DATA3_TABLE_ID_PK_AUTO_INC; +import static org.apache.fluss.record.TestData.DATA3_TABLE_PATH_PK_AUTO_INC; import static org.apache.fluss.record.TestData.DATA_1_WITH_KEY_AND_VALUE; import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID; import static org.apache.fluss.record.TestData.EXPECTED_LOG_RESULTS_FOR_DATA_1_WITH_PK; @@ -793,6 +805,294 @@ class ReplicaManagerTest extends ReplicaTestBase { }); } + @Test + void testLookupWithInsertIfNotExists() throws Exception { + TableBucket tb = new TableBucket(DATA1_TABLE_ID_PK, 1); + makeKvTableAsLeader(DATA1_TABLE_ID_PK, DATA1_TABLE_PATH_PK, tb.getBucket()); + + CompactedKeyEncoder keyEncoder = new CompactedKeyEncoder(DATA1_ROW_TYPE, new int[] {0}); + + // Scenario 1: All keys missing - should insert and return new values + byte[] key100 = keyEncoder.encodeKey(row(new Object[] {100})); + byte[] key200 = keyEncoder.encodeKey(row(new Object[] {200})); + + List<byte[]> inserted = lookupWithInsert(tb, Arrays.asList(key100, key200)).lookupValues(); + assertThat(inserted).hasSize(2).allMatch(Objects::nonNull); + verifyLookup(tb, key100, inserted.get(0)); + verifyLookup(tb, key200, inserted.get(1)); + + // Verify that corresponding 2 log records were created + FetchLogResultForBucket logResult = fetchLog(tb, 0L); + assertThat(logResult.getHighWatermark()).isEqualTo(2L); + LogRecords records = logResult.records(); + TestingSchemaGetter schemaGetter = + new TestingSchemaGetter(DEFAULT_SCHEMA_ID, DATA1_SCHEMA_PK); + List<Object[]> expected = Arrays.asList(new Object[] {100, null}, new Object[] {200, null}); + assertLogRecordsEquals(DATA1_ROW_TYPE, records, expected, ChangeType.INSERT, schemaGetter); + + // Scenario 2: All keys exist - should return existing values without modification + List<byte[]> existing = lookupWithInsert(tb, Arrays.asList(key100, key200)).lookupValues(); + assertThat(existing).containsExactlyElementsOf(inserted); + + // Verify that no new log records were created + logResult = fetchLog(tb, 0L); + assertThat(logResult.getHighWatermark()).isEqualTo(2L); + + // Scenario 3: Mixed - key100 exists, key300 missing + byte[] key300 = keyEncoder.encodeKey(row(new Object[] {300})); + List<byte[]> mixed = lookupWithInsert(tb, Arrays.asList(key100, key300)).lookupValues(); + assertThat(mixed.get(0)).isEqualTo(inserted.get(0)); // existing + assertThat(mixed.get(1)).isNotNull(); // newly inserted + verifyLookup(tb, key300, mixed.get(1)); + + // Verify that only one new log record was created for key300 + logResult = fetchLog(tb, 2L); + assertThat(logResult.getHighWatermark()).isEqualTo(3L); + records = logResult.records(); + expected = Collections.singletonList(new Object[] {300, null}); + assertLogRecordsEquals(DATA1_ROW_TYPE, records, expected, ChangeType.INSERT, schemaGetter); + } + + @Test + void testLookupWithInsertIfNotExistsAutoIncrement() throws Exception { + TableBucket tb = new TableBucket(DATA3_TABLE_ID_PK_AUTO_INC, 1); + makeKvTableAsLeader( + DATA3_TABLE_ID_PK_AUTO_INC, DATA3_TABLE_PATH_PK_AUTO_INC, tb.getBucket()); + + // Encode only the key field 'a' (index 0) - decoder returns only key fields, not full row + CompactedKeyEncoder keyEncoder = new CompactedKeyEncoder(DATA3_ROW_TYPE, new int[] {0}); + + // Lookup missing keys - should insert with auto-generated values for column 'c' + byte[] key1 = keyEncoder.encodeKey(row(new Object[] {100})); + byte[] key2 = keyEncoder.encodeKey(row(new Object[] {200})); + + List<byte[]> inserted = lookupWithInsert(tb, Arrays.asList(key1, key2)).lookupValues(); + assertThat(inserted).hasSize(2).allMatch(Objects::nonNull); + + // Decode values to verify auto-increment column values + TestingSchemaGetter schemaGetter = + new TestingSchemaGetter(DEFAULT_SCHEMA_ID, DATA3_SCHEMA_PK_AUTO_INC); + ValueDecoder valueDecoder = new ValueDecoder(schemaGetter, KvFormat.COMPACTED); + + InternalRow row1 = valueDecoder.decodeValue(inserted.get(0)).row; + InternalRow row2 = valueDecoder.decodeValue(inserted.get(1)).row; + + // Auto-increment values should be sequential + assertThat(row1.getLong(2)).isEqualTo(1L); + assertThat(row2.getLong(2)).isEqualTo(2L); + + // Lookup existing keys - should return same values without modification + List<byte[]> existing = lookupWithInsert(tb, Arrays.asList(key1, key2)).lookupValues(); + assertThat(existing).containsExactlyElementsOf(inserted); + + // Mixed scenario - key1 exists, key3 missing + byte[] key3 = keyEncoder.encodeKey(row(new Object[] {300})); + List<byte[]> mixed = lookupWithInsert(tb, Arrays.asList(key1, key3)).lookupValues(); + assertThat(mixed.get(0)).isEqualTo(inserted.get(0)); // existing unchanged + + InternalRow row3 = valueDecoder.decodeValue(mixed.get(1)).row; + assertThat(row3.getLong(2)).isEqualTo(3L); // continues sequence + + FetchLogResultForBucket logResult = fetchLog(tb, 0L); + assertThat(logResult.getHighWatermark()).isEqualTo(3L); + LogRecords records = logResult.records(); + List<Object[]> expected = + Arrays.asList( + new Object[] {100, null, 1L}, + new Object[] {200, null, 2L}, + new Object[] {300, null, 3L}); + TestingSchemaGetter schemaGetter2 = + new TestingSchemaGetter(DEFAULT_SCHEMA_ID, DATA3_SCHEMA_PK_AUTO_INC); + assertLogRecordsEquals(DATA3_ROW_TYPE, records, expected, ChangeType.INSERT, schemaGetter2); + } + + // Fixme + @Test + void testConcurrentLookupWithInsertIfNotExistsAutoIncrement() throws Exception { + TableBucket tb = new TableBucket(DATA3_TABLE_ID_PK_AUTO_INC, 1); + makeKvTableAsLeader( + DATA3_TABLE_ID_PK_AUTO_INC, DATA3_TABLE_PATH_PK_AUTO_INC, tb.getBucket()); + + CompactedKeyEncoder keyEncoder = new CompactedKeyEncoder(DATA3_ROW_TYPE, new int[] {0}); + + // Create keys that all threads will look up + byte[] key100 = keyEncoder.encodeKey(row(new Object[] {100})); + byte[] key200 = keyEncoder.encodeKey(row(new Object[] {200})); + byte[] key300 = keyEncoder.encodeKey(row(new Object[] {300})); + List<byte[]> keys = Arrays.asList(key100, key200, key300); + + int numThreads = 3; + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(numThreads); + + // Store results from each thread + @SuppressWarnings("unchecked") + List<byte[]>[] threadResults = new List[numThreads]; + + for (int i = 0; i < numThreads; i++) { + final int threadIndex = i; + executor.submit( + () -> { + try { + // Wait for all threads to be ready + startLatch.await(); + // Perform concurrent lookupWithInsert + LookupResultForBucket result = lookupWithInsert(tb, keys); + threadResults[threadIndex] = result.lookupValues(); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + doneLatch.countDown(); + } + }); + } + + // Start all threads simultaneously + startLatch.countDown(); + // Wait for all threads to complete + assertThat(doneLatch.await(30, TimeUnit.SECONDS)).isTrue(); + executor.shutdown(); + + // Verify all threads received non-null values + for (int i = 0; i < numThreads; i++) { + assertThat(threadResults[i]) + .as("Thread %d results should not be null", i) + .isNotNull() + .hasSize(3) + .allMatch(Objects::nonNull); + } + + // Verify all threads received the same values for each key (consistency check) + for (int keyIndex = 0; keyIndex < 3; keyIndex++) { + byte[] referenceValue = threadResults[0].get(keyIndex); + for (int threadIndex = 1; threadIndex < numThreads; threadIndex++) { + assertThat(threadResults[threadIndex].get(keyIndex)) + .as( + "Thread %d should have same value as thread 0 for key index %d", + threadIndex, keyIndex) + .isEqualTo(referenceValue); + } + } + + // Verify auto-increment values are sequential and unique + TestingSchemaGetter schemaGetter = + new TestingSchemaGetter(DEFAULT_SCHEMA_ID, DATA3_SCHEMA_PK_AUTO_INC); + ValueDecoder valueDecoder = new ValueDecoder(schemaGetter, KvFormat.COMPACTED); + + Set<Long> autoIncrementValues = new HashSet<>(); + for (byte[] value : threadResults[0]) { + InternalRow row = valueDecoder.decodeValue(value).row; + autoIncrementValues.add(row.getLong(2)); + } + + // Should have exactly 3 unique auto-increment values + assertThat(autoIncrementValues).hasSize(3); + + // Values should be 1, 2, 3 (in any order due to concurrency) + assertThat(autoIncrementValues).containsExactlyInAnyOrder(1L, 2L, 3L); + + // Verify WAL: 3 INSERTs minimum, up to 15 if all concurrent updates execute + FetchLogResultForBucket logResult = fetchLog(tb, 0L); + assertThat(logResult.getHighWatermark()).isBetween(3L, 15L); + + // Decode all WAL records + LogRecords records = logResult.records(); + LogRecordReadContext readContext = + LogRecordReadContext.createArrowReadContext( + DATA3_ROW_TYPE, + DEFAULT_SCHEMA_ID, + new TestingSchemaGetter(DEFAULT_SCHEMA_ID, DATA3_SCHEMA_PK_AUTO_INC)); + + List<Tuple2<ChangeType, Long>> recordsWithAutoInc = new ArrayList<>(); + for (LogRecordBatch batch : records.batches()) { + try (CloseableIterator<LogRecord> iterator = batch.records(readContext)) { + while (iterator.hasNext()) { + LogRecord record = iterator.next(); + recordsWithAutoInc.add( + Tuple2.of(record.getChangeType(), record.getRow().getLong(2))); + } + } + } + + // First 3 must be INSERTs with unique auto-increment values (1, 2, 3) + assertThat(recordsWithAutoInc).hasSizeGreaterThanOrEqualTo(3); + Set<Long> insertAutoIncs = new HashSet<>(); + for (int i = 0; i < 3; i++) { + assertThat(recordsWithAutoInc.get(i).f0).isEqualTo(ChangeType.INSERT); + insertAutoIncs.add(recordsWithAutoInc.get(i).f1); + } + assertThat(insertAutoIncs).containsExactlyInAnyOrder(1L, 2L, 3L); + + // Remaining 12 must be UPDATE pairs (UPDATE_BEFORE + UPDATE_AFTER) preserving + // auto-increment + for (int i = 3; i < recordsWithAutoInc.size(); i++) { + Tuple2<ChangeType, Long> record = recordsWithAutoInc.get(i); + assertThat(record.f0).isIn(ChangeType.UPDATE_BEFORE, ChangeType.UPDATE_AFTER); + assertThat(record.f1).isIn(1L, 2L, 3L); + } + } + + @Test + void testLookupWithInsertIfNotExistsMultiBucket() throws Exception { + TableBucket tb0 = new TableBucket(DATA1_TABLE_ID_PK, 0); + TableBucket tb1 = new TableBucket(DATA1_TABLE_ID_PK, 1); + makeKvTableAsLeader(DATA1_TABLE_ID_PK, DATA1_TABLE_PATH_PK, tb0.getBucket()); + makeKvTableAsLeader(DATA1_TABLE_ID_PK, DATA1_TABLE_PATH_PK, tb1.getBucket()); + + CompactedKeyEncoder keyEncoder = new CompactedKeyEncoder(DATA1_ROW_TYPE, new int[] {0}); + byte[] key0 = keyEncoder.encodeKey(row(new Object[] {0})); + byte[] key1 = keyEncoder.encodeKey(row(new Object[] {1})); + + verifyLookup(tb0, key0, null); + verifyLookup(tb1, key1, null); + + Map<TableBucket, List<byte[]>> requestMap = new HashMap<>(); + requestMap.put(tb0, Collections.singletonList(key0)); + requestMap.put(tb1, Collections.singletonList(key1)); + + // Insert missing keys across buckets + CompletableFuture<Map<TableBucket, LookupResultForBucket>> future = + new CompletableFuture<>(); + replicaManager.lookups(true, 20000, 1, requestMap, LOOKUP_KV_VERSION, future::complete); + Map<TableBucket, LookupResultForBucket> inserted = future.get(5, TimeUnit.SECONDS); + + byte[] value0 = inserted.get(tb0).lookupValues().get(0); + byte[] value1 = inserted.get(tb1).lookupValues().get(0); + + // Verify inserted values via lookup + verifyLookup(tb0, key0, value0); + verifyLookup(tb1, key1, value1); + } + + private LookupResultForBucket lookupWithInsert(TableBucket tb, List<byte[]> keys) + throws Exception { + CompletableFuture<Map<TableBucket, LookupResultForBucket>> future = + new CompletableFuture<>(); + replicaManager.lookups( + true, + 20000, + 1, + Collections.singletonMap(tb, keys), + LOOKUP_KV_VERSION, + future::complete); + LookupResultForBucket result = future.get(5, TimeUnit.SECONDS).get(tb); + assertThat(result.failed()).isFalse(); + return result; + } + + private FetchLogResultForBucket fetchLog(TableBucket tb, long fetchOffset) throws Exception { + CompletableFuture<Map<TableBucket, FetchLogResultForBucket>> future = + new CompletableFuture<>(); + replicaManager.fetchLogRecords( + buildFetchParams(-1, Integer.MAX_VALUE), + Collections.singletonMap( + tb, new FetchReqInfo(tb.getTableId(), fetchOffset, Integer.MAX_VALUE)), + null, + future::complete); + return future.get().get(tb); + } + @Test void testPrefixLookup() throws Exception { TablePath tablePath = TablePath.of("test_db_1", "test_prefix_lookup_t1"); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java index 559353df6..3fb01a7e2 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java @@ -106,6 +106,10 @@ import static org.apache.fluss.record.TestData.DATA2_SCHEMA; import static org.apache.fluss.record.TestData.DATA2_TABLE_DESCRIPTOR; import static org.apache.fluss.record.TestData.DATA2_TABLE_ID; import static org.apache.fluss.record.TestData.DATA2_TABLE_PATH; +import static org.apache.fluss.record.TestData.DATA3_SCHEMA_PK_AUTO_INC; +import static org.apache.fluss.record.TestData.DATA3_TABLE_DESCRIPTOR_PK_AUTO_INC; +import static org.apache.fluss.record.TestData.DATA3_TABLE_ID_PK_AUTO_INC; +import static org.apache.fluss.record.TestData.DATA3_TABLE_PATH_PK_AUTO_INC; import static org.apache.fluss.server.coordinator.CoordinatorContext.INITIAL_COORDINATOR_EPOCH; import static org.apache.fluss.server.replica.ReplicaManager.HIGH_WATERMARK_CHECKPOINT_FILE_NAME; import static org.apache.fluss.server.zk.data.LeaderAndIsr.INITIAL_BUCKET_EPOCH; @@ -266,6 +270,12 @@ public class ReplicaTestBase { DATA2_TABLE_PATH, TableRegistration.newTable(DATA2_TABLE_ID, DATA2_TABLE_DESCRIPTOR)); zkClient.registerFirstSchema(DATA2_TABLE_PATH, DATA2_SCHEMA); + + zkClient.registerTable( + DATA3_TABLE_PATH_PK_AUTO_INC, + TableRegistration.newTable( + DATA3_TABLE_ID_PK_AUTO_INC, DATA3_TABLE_DESCRIPTOR_PK_AUTO_INC)); + zkClient.registerFirstSchema(DATA3_TABLE_PATH_PK_AUTO_INC, DATA3_SCHEMA_PK_AUTO_INC); } protected long registerTableInZkClient( diff --git a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java index 5a28f9e2b..1137e499a 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java @@ -20,6 +20,7 @@ package org.apache.fluss.server.tablet; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.exception.FlussRuntimeException; import org.apache.fluss.exception.InvalidRequiredAcksException; +import org.apache.fluss.metadata.KvFormat; import org.apache.fluss.metadata.LogFormat; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.Schema; @@ -30,7 +31,9 @@ import org.apache.fluss.metadata.TablePath; import org.apache.fluss.record.DefaultKvRecordBatch; import org.apache.fluss.record.DefaultValueRecordBatch; import org.apache.fluss.record.TestingSchemaGetter; +import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.encode.CompactedKeyEncoder; +import org.apache.fluss.row.encode.ValueDecoder; import org.apache.fluss.row.encode.ValueEncoder; import org.apache.fluss.rpc.gateway.TabletServerGateway; import org.apache.fluss.rpc.messages.FetchLogResponse; @@ -973,4 +976,100 @@ public class TabletServiceITCase { return ServerRpcMessageUtils.makeNotifyLeaderAndIsrRequest( 0, Collections.singletonList(reqForBucket)); } + + @Test + void testLookupWithInsertIfNotExists() throws Exception { + long tableId = + createTable( + FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH_PK, DATA1_TABLE_DESCRIPTOR_PK); + TableBucket tb = new TableBucket(tableId, 0); + FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb); + + int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb); + TabletServerGateway gateway = FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader); + CompactedKeyEncoder keyEncoder = new CompactedKeyEncoder(DATA1_ROW_TYPE, new int[] {0}); + + byte[] key1 = keyEncoder.encodeKey(row(new Object[] {100})); + byte[] key2 = keyEncoder.encodeKey(row(new Object[] {200})); + byte[] key3 = keyEncoder.encodeKey(row(new Object[] {300})); + + // Insert missing keys + PbLookupRespForBucket resp1 = + gateway.lookup(newLookupRequest(tableId, 0, true, 20000, 1, key1, key2)) + .get() + .getBucketsRespAt(0); + assertThat(resp1.hasErrorCode()).isFalse(); + byte[] value1 = resp1.getValuesList().get(0).getValues(); + byte[] value2 = resp1.getValuesList().get(1).getValues(); + assertThat(value1).isNotNull(); + assertThat(value2).isNotNull(); + + // Existing keys return same values + PbLookupRespForBucket resp2 = + gateway.lookup(newLookupRequest(tableId, 0, true, 20000, 1, key1, key2)) + .get() + .getBucketsRespAt(0); + assertThat(resp2.getValuesList().get(0).getValues()).isEqualTo(value1); + assertThat(resp2.getValuesList().get(1).getValues()).isEqualTo(value2); + + // Mixed: key1 exists, key3 missing + PbLookupRespForBucket resp3 = + gateway.lookup(newLookupRequest(tableId, 0, true, 20000, 1, key1, key3)) + .get() + .getBucketsRespAt(0); + assertThat(resp3.getValuesList().get(0).getValues()).isEqualTo(value1); + assertThat(resp3.getValuesList().get(1).getValues()).isNotNull(); + } + + @Test + void testLookupWithInsertIfNotExistsAutoIncrement() throws Exception { + TablePath tablePath = TablePath.of("test_db_1", "test_auto_increment_t1"); + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.BIGINT()) + .withComment("auto increment column") + .column("c", DataTypes.STRING()) + .enableAutoIncrement("b") + .primaryKey("a") + .build(); + RowType rowType = schema.getRowType(); + TableDescriptor descriptor = + TableDescriptor.builder().schema(schema).distributedBy(1, "a").build(); + long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath, descriptor); + TableBucket tb = new TableBucket(tableId, 0); + FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb); + + int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb); + TabletServerGateway gateway = FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader); + CompactedKeyEncoder keyEncoder = new CompactedKeyEncoder(rowType, new int[] {0}); + TestingSchemaGetter schemaGetter = new TestingSchemaGetter(DEFAULT_SCHEMA_ID, schema); + ValueDecoder valueDecoder = new ValueDecoder(schemaGetter, KvFormat.COMPACTED); + + byte[] key1 = keyEncoder.encodeKey(row(new Object[] {100})); + byte[] key2 = keyEncoder.encodeKey(row(new Object[] {200})); + byte[] key3 = keyEncoder.encodeKey(row(new Object[] {300})); + + // Insert key1, key2 - auto-increment: 1, 2 + PbLookupRespForBucket resp1 = + gateway.lookup(newLookupRequest(tableId, 0, true, 20000, 1, key1, key2)) + .get() + .getBucketsRespAt(0); + assertThat(resp1.hasErrorCode()).isFalse(); + InternalRow row1 = valueDecoder.decodeValue(resp1.getValuesList().get(0).getValues()).row; + InternalRow row2 = valueDecoder.decodeValue(resp1.getValuesList().get(1).getValues()).row; + assertThat(row1.getLong(1)).isEqualTo(1L); + assertThat(row2.getLong(1)).isEqualTo(2L); + + // Mixed: key1 exists (unchanged), key3 missing (gets 3) + PbLookupRespForBucket resp2 = + gateway.lookup(newLookupRequest(tableId, 0, true, 20000, 1, key1, key3)) + .get() + .getBucketsRespAt(0); + InternalRow existingRow = + valueDecoder.decodeValue(resp2.getValuesList().get(0).getValues()).row; + InternalRow newRow = valueDecoder.decodeValue(resp2.getValuesList().get(1).getValues()).row; + assertThat(existingRow.getLong(1)).isEqualTo(1L); + assertThat(newRow.getLong(1)).isEqualTo(3L); + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java index 5055105e9..41c4ee967 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java @@ -284,6 +284,27 @@ public class RpcMessageTestUtils { return lookupRequest; } + public static LookupRequest newLookupRequest( + long tableId, + int bucketId, + boolean insertIfNotExists, + int timeoutMs, + int acks, + byte[]... keys) { + LookupRequest lookupRequest = + new LookupRequest() + .setTableId(tableId) + .setInsertIfNotExists(insertIfNotExists) + .setTimeoutMs(timeoutMs) + .setAcks(acks); + PbLookupReqForBucket pbLookupReqForBucket = lookupRequest.addBucketsReq(); + pbLookupReqForBucket.setBucketId(bucketId); + for (byte[] key : keys) { + pbLookupReqForBucket.addKey(key); + } + return lookupRequest; + } + public static PrefixLookupRequest newPrefixLookupRequest( long tableId, int bucketId, List<byte[]> prefixKeys) { PrefixLookupRequest prefixLookupRequest = new PrefixLookupRequest().setTableId(tableId); diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index 56678e7e8..443359c32 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -337,6 +337,7 @@ <exclude> org.apache.fluss.row.columnar.BytesColumnVector.Bytes </exclude> + <exclude>org.apache.fluss.row.decode.KeyDecoder</exclude> <exclude>org.apache.fluss.row.encode.RowEncoder</exclude> <exclude>org.apache.fluss.row.encode.KeyEncoder</exclude> <exclude>org.apache.fluss.table.*</exclude>
