wuchong commented on code in PR #2464:
URL: https://github.com/apache/fluss/pull/2464#discussion_r2740347101
##########
fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java:
##########
@@ -1283,6 +1286,21 @@ public class ConfigOptions {
"The format of the kv records in kv store. The
default value is `compacted`. "
+ "The supported formats are `compacted`
and `indexed`.");
+ /**
+ * The version of the kv format. This is used for backward compatibility
when encoding strategy
+ * changes. Version 1 (absent): Old tables use datalake encoding for both
primary key and bucket
+ * key. Version 2: New tables use Fluss encoding for primary key (to
support prefix lookup) and
+ * datalake encoding for bucket key (to align with datalake bucket).
+ */
+ public static final ConfigOption<Integer> TABLE_KV_FORMAT_VERSION =
+ key("table.kv.format.version")
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "The version of the kv format for backward
compatibility. "
+ + "When absent (old tables), primary key
encoding may differ from new tables. "
+ + "Version 2 indicates new encoding
strategy for datalake tables.");
Review Comment:
Add detailed explanation for the config option, explain behavior for
different format version. And please update documentation.
##########
fluss-client/src/test/java/org/apache/fluss/client/table/FlussLakeTableITCase.java:
##########
@@ -192,23 +195,26 @@ void testPrimaryKeyTable(boolean isPartitioned, boolean
isDefaultBucketKey) thro
Set<InternalRow> allRows =
writtenRows.values().stream().flatMap(List::stream).collect(Collectors.toSet());
- // init lookup columns
- List<String> lookUpColumns;
- if (isDefaultBucketKey) {
- lookUpColumns = Arrays.asList("a", "c", "d");
- } else {
- lookUpColumns =
- isPartitioned ? Arrays.asList("a", "d") :
Collections.singletonList("a");
- }
- List<InternalRow.FieldGetter> lookUpFieldGetter = new
ArrayList<>(lookUpColumns.size());
- for (int columnIndex : pkTableSchema.getColumnIndexes(lookUpColumns)) {
- lookUpFieldGetter.add(
- InternalRow.createFieldGetter(
- pkTableSchema.getRowType().getTypeAt(columnIndex),
columnIndex));
- }
// lookup
try (Table table = conn.getTable(tablePath)) {
- Lookuper lookuper =
table.newLookup().lookupBy(lookUpColumns).createLookuper();
+ // init lookup columns
+ List<String> lookUpColumns;
+ Lookuper lookuper;
+ if (isDefaultBucketKey) {
+ lookUpColumns = Arrays.asList("a", "c", "d");
+ lookuper = table.newLookup().createLookuper();
+ } else {
+ lookUpColumns =
+ isPartitioned ? Arrays.asList("a", "d") :
Collections.singletonList("a");
+ lookuper =
table.newLookup().lookupBy(lookUpColumns).createLookuper();
Review Comment:
Currently, in the new version, the `lookupBy` only supports prefix key, but
this is not align with the method name.
We should check `lookupColumnNames` is a valid prefix key, then create
`PrefixKeyLookup`; if it is a primary key, then create `PrimaryKeyLookup`;
otherwise, should throw a exception to let user know the lookup columns are not
valid.
After this change, we don't need this change in the test and thus, provides
backward compatiblity.
##########
fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java:
##########
@@ -1283,6 +1286,21 @@ public class ConfigOptions {
"The format of the kv records in kv store. The
default value is `compacted`. "
+ "The supported formats are `compacted`
and `indexed`.");
+ /**
+ * The version of the kv format. This is used for backward compatibility
when encoding strategy
+ * changes. Version 1 (absent): Old tables use datalake encoding for both
primary key and bucket
+ * key. Version 2: New tables use Fluss encoding for primary key (to
support prefix lookup) and
+ * datalake encoding for bucket key (to align with datalake bucket).
+ */
+ public static final ConfigOption<Integer> TABLE_KV_FORMAT_VERSION =
+ key("table.kv.format.version")
Review Comment:
I suggest using `table.kv.format-version` instead of
`table.kv.format.version`.
Since `table.kv.format` is already defined as a string value (e.g,
`compacted`), using a dot-separator would create a namespace conflict in the
config hierarchy. Using a hyphen keeps the parameters flat and ensures backward
compatibility with the existing parser.
Besides, Iceberg also uses `format-version` for the version configuration.
##########
fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java:
##########
@@ -44,8 +44,8 @@ public enum ApiKeys {
UPDATE_METADATA(1013, 0, 0, PRIVATE),
PRODUCE_LOG(1014, 0, 0, PUBLIC),
FETCH_LOG(1015, 0, 0, PUBLIC),
- PUT_KV(1016, 0, 0, PUBLIC),
- LOOKUP(1017, 0, 0, PUBLIC),
+ PUT_KV(1016, 0, 1, PUBLIC),
+ LOOKUP(1017, 0, 1, PUBLIC),
Review Comment:
Add comments to explain the version bump and version behaviors.
##########
fluss-common/src/main/java/org/apache/fluss/row/encode/KeyEncoder.java:
##########
@@ -26,19 +27,87 @@
import javax.annotation.Nullable;
import java.util.List;
+import java.util.Optional;
/** An interface for encoding key of row into bytes. */
public interface KeyEncoder {
/** Encode the key of given row to byte array. */
byte[] encodeKey(InternalRow row);
+ //
-------------------------------------------------------------------------
+ // Factory Methods
+ //
-------------------------------------------------------------------------
+
/**
- * Create a key encoder to encode the key array bytes of the input row.
+ * Creates a primary key encoder for the given table configuration.
+ *
+ * <p><b>Backward Compatibility:</b> For tables created before the
introduction of kv format
+ * version (legacy tables without kvFormatVersion), we continue to use the
original encoding
+ * method (lake's encoder for lake tables) to ensure data compatibility.
+ *
+ * <p><b>New Tables (kvFormatVersion = 2):</b> For new tables, we cannot
always use the lake's
+ * encoder because some lake encoders (e.g., Paimon) don't support prefix
lookup. Prefix lookup
+ * requires the bucket key bytes encoded as a prefix of the primary key
bytes encoded, which
+ * Paimon's encoding format does not guarantee. To solve this, new tables
use Fluss's own {@link
+ * CompactedKeyEncoder} which ensures the bucket key bytes encoded is
always a prefix of the
+ * primary key bytes encoded.
+ *
+ * <p><b>Optimization for Default Bucket Key:</b> Prefix lookup is only
needed when the bucket
+ * key is a subset of the primary key. If {@code isDefaultBucketKey} is
true (bucket key equals
+ * primary key), prefix lookup is not supported, so we can directly use
the lake's encoder. This
+ * also provides a performance benefit: since bucket calculation always
requires encoding the
+ * bucket key using the lake's encoder (to ensure correct bucketing for
the lake), when the
+ * primary key uses the same lake's encoder, the encoded primary key bytes
can be directly
+ * reused for bucket calculation, saving one encoding operation.
*
* @param rowType the row type of the input row
- * @param keyFields the key fields to encode
+ * @param keyFields the primary key fields to encode
+ * @param tableConfig the table configuration containing kv format version
and lake format
+ * @param isDefaultBucketKey true if bucket key equals primary key (no
prefix lookup supported)
+ * @return the primary key encoder
+ */
+ static KeyEncoder ofPrimaryKeyEncoder(
+ RowType rowType,
+ List<String> keyFields,
+ TableConfig tableConfig,
+ boolean isDefaultBucketKey) {
+ Optional<Integer> kvFormatVersion = tableConfig.getKvFormatVersion();
+ DataLakeFormat dataLakeFormat =
tableConfig.getDataLakeFormat().orElse(null);
+ if (!kvFormatVersion.isPresent()) {
+ return of(rowType, keyFields, dataLakeFormat);
+ }
+ int version = kvFormatVersion.get();
+ if (version == 2) {
+ if (isDefaultBucketKey) {
+ return of(rowType, keyFields, dataLakeFormat);
+ } else {
+ // use CompactedKeyEncoder to support prefix look up
+ return CompactedKeyEncoder.createKeyEncoder(rowType,
keyFields);
+ }
+ }
Review Comment:
We should also fallback to use legacy behavior if format version is `1`, as
your config option explains.
##########
fluss-common/src/main/java/org/apache/fluss/row/encode/KeyEncoder.java:
##########
@@ -26,19 +27,87 @@
import javax.annotation.Nullable;
import java.util.List;
+import java.util.Optional;
/** An interface for encoding key of row into bytes. */
public interface KeyEncoder {
/** Encode the key of given row to byte array. */
byte[] encodeKey(InternalRow row);
+ //
-------------------------------------------------------------------------
+ // Factory Methods
+ //
-------------------------------------------------------------------------
+
/**
- * Create a key encoder to encode the key array bytes of the input row.
+ * Creates a primary key encoder for the given table configuration.
+ *
+ * <p><b>Backward Compatibility:</b> For tables created before the
introduction of kv format
+ * version (legacy tables without kvFormatVersion), we continue to use the
original encoding
+ * method (lake's encoder for lake tables) to ensure data compatibility.
+ *
+ * <p><b>New Tables (kvFormatVersion = 2):</b> For new tables, we cannot
always use the lake's
+ * encoder because some lake encoders (e.g., Paimon) don't support prefix
lookup. Prefix lookup
+ * requires the bucket key bytes encoded as a prefix of the primary key
bytes encoded, which
+ * Paimon's encoding format does not guarantee. To solve this, new tables
use Fluss's own {@link
+ * CompactedKeyEncoder} which ensures the bucket key bytes encoded is
always a prefix of the
+ * primary key bytes encoded.
+ *
+ * <p><b>Optimization for Default Bucket Key:</b> Prefix lookup is only
needed when the bucket
+ * key is a subset of the primary key. If {@code isDefaultBucketKey} is
true (bucket key equals
+ * primary key), prefix lookup is not supported, so we can directly use
the lake's encoder. This
+ * also provides a performance benefit: since bucket calculation always
requires encoding the
+ * bucket key using the lake's encoder (to ensure correct bucketing for
the lake), when the
+ * primary key uses the same lake's encoder, the encoded primary key bytes
can be directly
+ * reused for bucket calculation, saving one encoding operation.
*
* @param rowType the row type of the input row
- * @param keyFields the key fields to encode
+ * @param keyFields the primary key fields to encode
+ * @param tableConfig the table configuration containing kv format version
and lake format
+ * @param isDefaultBucketKey true if bucket key equals primary key (no
prefix lookup supported)
+ * @return the primary key encoder
+ */
+ static KeyEncoder ofPrimaryKeyEncoder(
+ RowType rowType,
+ List<String> keyFields,
+ TableConfig tableConfig,
+ boolean isDefaultBucketKey) {
+ Optional<Integer> kvFormatVersion = tableConfig.getKvFormatVersion();
+ DataLakeFormat dataLakeFormat =
tableConfig.getDataLakeFormat().orElse(null);
+ if (!kvFormatVersion.isPresent()) {
+ return of(rowType, keyFields, dataLakeFormat);
+ }
+ int version = kvFormatVersion.get();
+ if (version == 2) {
+ if (isDefaultBucketKey) {
+ return of(rowType, keyFields, dataLakeFormat);
+ } else {
+ // use CompactedKeyEncoder to support prefix look up
+ return CompactedKeyEncoder.createKeyEncoder(rowType,
keyFields);
+ }
+ }
+ throw new UnsupportedOperationException("Unsupported kv format
version: " + version);
+ }
+
+ /**
+ * Creates a bucket key encoder for bucket calculation with a custom row
type.
+ *
+ * @param rowType the row type of the input row
+ * @param keyFields the bucket key fields to encode
* @param lakeFormat the datalake format
+ * @return the bucket key encoder
+ */
+ static KeyEncoder ofBucketKeyEncoder(
+ RowType rowType, List<String> keyFields, @Nullable DataLakeFormat
lakeFormat) {
+ return of(rowType, keyFields, lakeFormat);
+ }
+
+ /**
+ * Creates a key encoder based on the datalake format.
+ *
+ * @param rowType the row type of the input row
+ * @param keyFields the key fields to encode
+ * @param lakeFormat the datalake format, null means no datalake
*/
static KeyEncoder of(
Review Comment:
nit: This is not suggested to be used externally, however, private static
method is supported since java9, and we still provides java8 compatibility, we
can't use private static here, but could you add an deprecate annotation and
comment on this method?
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java:
##########
@@ -492,6 +494,27 @@ private TableDescriptor applySystemDefaults(
newDescriptor = newDescriptor.withProperties(newProperties);
}
}
+
+ if (newDescriptor.hasPrimaryKey()) {
+ Map<String, String> newProperties = new
HashMap<>(newDescriptor.getProperties());
+ String formatVersion =
newProperties.get(ConfigOptions.TABLE_KV_FORMAT_VERSION.key());
+ if (formatVersion != null) {
+ throw new IllegalArgumentException(
+ "Manual configuration of '"
+ + ConfigOptions.TABLE_KV_FORMAT_VERSION.key()
+ + "' is not supported. The coordinator chooses
the appropriate "
+ + "KV format version automatically. Remove
this property from the "
+ + "table properties (current value: '"
Review Comment:
Do we really need to be so strict? I think the format version should be
user-configurable, similar to how Apache Iceberg format version handles it.
Right now, we have a critical bug in version 1 that justifies enforcing a
specific version, but in the future, version evolution may primarily introduce
advanced features rather than fix correctness issues. In such cases, users
shouldn’t be forced to adopt a particular version; they should be free to
choose based on their needs and risk tolerance.
Relaxing this registration would also help us construct use cases with older
format versions, which is beneficial for building compatibility test cases.
##########
fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java:
##########
@@ -305,10 +310,16 @@ void testPutAndPrefixLookup() throws Exception {
createTable(tablePath, descriptor, false);
Table table = conn.getTable(tablePath);
TableInfo tableInfo = table.getTableInfo();
- verifyPutAndLookup(table, new Object[] {1, "a", 1L, "value1"});
- verifyPutAndLookup(table, new Object[] {1, "a", 2L, "value2"});
- verifyPutAndLookup(table, new Object[] {1, "a", 3L, "value3"});
- verifyPutAndLookup(table, new Object[] {2, "a", 4L, "value4"});
+
+ // We use strings with length > 7 (e.g., "valuevalue1") to properly
test prefix lookup.
+ // Previously, short strings like "value1" hid a bug: Paimon's encoder
stores strings
+ // longer than 7 characters in a variable-length area, which breaks
prefix lookup since
+ // the encoded bucket key bytes are no longer a prefix of the encoded
primary key bytes.
+ // Using longer strings ensures we catch such issues.
+ verifyPutAndLookup(table, new Object[] {1, "a", 1L, "valuevalue1"});
+ verifyPutAndLookup(table, new Object[] {1, "a", 2L, "valuevalue2"});
+ verifyPutAndLookup(table, new Object[] {1, "a", 3L, "valuevalue3"});
+ verifyPutAndLookup(table, new Object[] {2, "a", 4L, "valuevalue4"});
Review Comment:
I tried to run this test in `main` branch, and I can't reproduce the problem.
I think this can't reproduce the problem, because `c` is not a part of
prefix key, so the encoding bytes of prefix key is still the same and can fetch
result. In order to reproduce the problem, we should make the value length of
column `b` larger than `7`.
##########
fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java:
##########
@@ -1746,6 +1768,31 @@ public void shutdown() throws InterruptedException {
checkpointHighWatermarks();
}
+ private void validateClientVersionForPkTable(int apiVersion, TableInfo
tableInfo) {
+ if (apiVersion > 0) {
+ return;
+ }
+
+ // in the old version
+ TableConfig tableConfig = tableInfo.getTableConfig();
+ // is with datalake format
+ if (tableConfig.getDataLakeFormat().isPresent()) {
+ Optional<Integer> kvFormatVersion =
tableConfig.getKvFormatVersion();
+ if (kvFormatVersion.isPresent()
+ && kvFormatVersion.get() == KV_FORMAT_VERSION_2
+ && !tableInfo.isDefaultBucketKey()) {
+ throw new UnsupportedVersionException(
+ String.format(
+ "Client API version %d is not supported for
table '%s'. "
+ + "This table uses new key encoding
strategy (kv format version %d). "
+ + "Please upgrade your Fluss client to
a newer version.",
+ apiVersion,
+ tableInfo.getTablePath(),
+
tableInfo.getTableConfig().getKvFormatVersion().get()));
Review Comment:
```suggestion
kvFormatVersion.get()));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]