Copilot commented on code in PR #1826:
URL: https://github.com/apache/fluss/pull/1826#discussion_r2438983989
##########
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java:
##########
@@ -370,6 +374,39 @@ void testCreateLakeEnabledTableWithAllTypes() throws
Exception {
BUCKET_NUM);
}
+ @Test
+ void testCreateLakeEnableTableWithUnsettablePaimonOptions() {
+ Map<String, String> customProperties = new HashMap<>();
+
+ Set<String> keys = new HashSet<>();
+ keys.addAll(PAIMON_UNSETTABLE_OPTIONS);
+ keys.addAll(PAIMON_DEFAULT_OPTIONS.keySet());
Review Comment:
PAIMON_DEFAULT_OPTIONS is an Options instance which does not expose
keySet(); this will not compile. Use PAIMON_DEFAULT_OPTIONS.toMap().keySet() to
obtain the keys.
```suggestion
keys.addAll(PAIMON_DEFAULT_OPTIONS.toMap().keySet());
```
##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java:
##########
@@ -104,4 +135,111 @@ public static List<SchemaChange> toPaimonSchemaChanges(
return schemaChanges;
}
+
+ public static Schema toPaimonSchema(TableDescriptor tableDescriptor) {
+ // validate paimon options first
+ validatePaimonOptions(tableDescriptor.getProperties());
+ validatePaimonOptions(tableDescriptor.getCustomProperties());
+
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ Options options = new Options();
+
+ // set default properties
+ setPaimonDefaultProperties(options);
+
+ // When bucket key is undefined, it should use dynamic bucket (bucket
= -1) mode.
+ List<String> bucketKeys = tableDescriptor.getBucketKeys();
+ if (!bucketKeys.isEmpty()) {
+ int numBuckets =
+ tableDescriptor
+ .getTableDistribution()
+
.flatMap(TableDescriptor.TableDistribution::getBucketCount)
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ "Bucket count should be
set."));
+ options.set(CoreOptions.BUCKET, numBuckets);
+ options.set(CoreOptions.BUCKET_KEY, String.join(",", bucketKeys));
+ } else {
+ options.set(CoreOptions.BUCKET, CoreOptions.BUCKET.defaultValue());
+ }
+
+ // set schema
+ for (org.apache.fluss.metadata.Schema.Column column :
+ tableDescriptor.getSchema().getColumns()) {
+ String columnName = column.getName();
+ if (SYSTEM_COLUMNS.containsKey(columnName)) {
+ throw new InvalidTableException(
+ "Column "
+ + columnName
+ + " conflicts with a system column name of
paimon table, please rename the column.");
+ }
+ schemaBuilder.column(
+ columnName,
+
column.getDataType().accept(FlussDataTypeToPaimonDataType.INSTANCE),
+ column.getComment().orElse(null));
+ }
+
+ // add system metadata columns to schema
+ for (Map.Entry<String, DataType> systemColumn :
SYSTEM_COLUMNS.entrySet()) {
+ schemaBuilder.column(systemColumn.getKey(),
systemColumn.getValue());
+ }
+
+ // set pk
+ if (tableDescriptor.hasPrimaryKey()) {
+ schemaBuilder.primaryKey(
+
tableDescriptor.getSchema().getPrimaryKey().get().getColumnNames());
+ options.set(
+ CoreOptions.CHANGELOG_PRODUCER.key(),
+ CoreOptions.ChangelogProducer.INPUT.toString());
+ }
+ // set partition keys
+ schemaBuilder.partitionKeys(tableDescriptor.getPartitionKeys());
+
+ // set properties to paimon schema
+ tableDescriptor.getProperties().forEach((k, v) ->
setFlussPropertyToPaimon(k, v, options));
+ tableDescriptor
+ .getCustomProperties()
+ .forEach((k, v) -> setFlussPropertyToPaimon(k, v, options));
+ schemaBuilder.options(options.toMap());
+ return schemaBuilder.build();
+ }
+
+ private static void validatePaimonOptions(Map<String, String> properties) {
+ properties.forEach(
+ (k, v) -> {
+ String paimonKey = k;
+ if (k.startsWith(PAIMON_CONF_PREFIX)) {
+ paimonKey = k.substring(PAIMON_CONF_PREFIX.length());
+ }
+ if (PAIMON_UNSETTABLE_OPTIONS.contains(paimonKey)
+ ||
PAIMON_DEFAULT_OPTIONS.toMap().containsKey(paimonKey)) {
+ throw new InvalidConfigException(
+ String.format(
+ "The Paimon option %s will be set
automatically by Fluss "
+ + "and should not set
manually.",
Review Comment:
Grammar nit: change 'should not set manually' to 'should not be set
manually' for clarity.
```suggestion
+ "and should not be set
manually.",
```
##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java:
##########
@@ -17,26 +17,58 @@
package org.apache.fluss.lake.paimon.utils;
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.exception.InvalidConfigException;
+import org.apache.fluss.exception.InvalidTableException;
+import org.apache.fluss.lake.paimon.FlussDataTypeToPaimonDataType;
import org.apache.fluss.lake.paimon.source.FlussRowAsPaimonRow;
import org.apache.fluss.metadata.TableChange;
+import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.record.ChangeType;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.InternalRow;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
-import java.util.function.Function;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.fluss.lake.paimon.PaimonLakeCatalog.SYSTEM_COLUMNS;
Review Comment:
PaimonConversions now statically depends on PaimonLakeCatalog via
SYSTEM_COLUMNS while PaimonLakeCatalog depends on PaimonConversions, creating
tight coupling. Consider moving SYSTEM_COLUMNS to a dedicated constants/util
class to avoid circular dependencies and improve modularity.
```suggestion
import static
org.apache.fluss.lake.paimon.PaimonLakeConstants.SYSTEM_COLUMNS;
```
##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java:
##########
@@ -17,26 +17,58 @@
package org.apache.fluss.lake.paimon.utils;
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.exception.InvalidConfigException;
+import org.apache.fluss.exception.InvalidTableException;
+import org.apache.fluss.lake.paimon.FlussDataTypeToPaimonDataType;
import org.apache.fluss.lake.paimon.source.FlussRowAsPaimonRow;
import org.apache.fluss.metadata.TableChange;
+import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.record.ChangeType;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.InternalRow;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
-import java.util.function.Function;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.fluss.lake.paimon.PaimonLakeCatalog.SYSTEM_COLUMNS;
/** Utils for conversion between Paimon and Fluss. */
public class PaimonConversions {
+ // for fluss config
+ private static final String FLUSS_CONF_PREFIX = "fluss.";
+ // for paimon config
+ private static final String PAIMON_CONF_PREFIX = "paimon.";
+
+ /** Paimon config options set by Fluss should not be set by users. */
+ @VisibleForTesting public static final Set<String>
PAIMON_UNSETTABLE_OPTIONS = new HashSet<>();
+
+ @VisibleForTesting public static final Options PAIMON_DEFAULT_OPTIONS =
new Options();
+
+ static {
+ PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET.key());
+ PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET_KEY.key());
+ PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.CHANGELOG_PRODUCER.key());
+
+ // set partition.legacy-name to false, otherwise paimon will use
toString for all types,
+ // which will cause inconsistent partition value for a same binary
value
Review Comment:
Improve grammar in the comment: 'for a same binary value' -> 'for the same
binary value'.
```suggestion
// which will cause inconsistent partition value for the same binary
value
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]