Copilot commented on code in PR #1826:
URL: https://github.com/apache/fluss/pull/1826#discussion_r2438983989


##########
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java:
##########
@@ -370,6 +374,39 @@ void testCreateLakeEnabledTableWithAllTypes() throws 
Exception {
                 BUCKET_NUM);
     }
 
+    @Test
+    void testCreateLakeEnableTableWithUnsettablePaimonOptions() {
+        Map<String, String> customProperties = new HashMap<>();
+
+        Set<String> keys = new HashSet<>();
+        keys.addAll(PAIMON_UNSETTABLE_OPTIONS);
+        keys.addAll(PAIMON_DEFAULT_OPTIONS.keySet());

Review Comment:
   PAIMON_DEFAULT_OPTIONS is an Options instance which does not expose 
keySet(); this will not compile. Use PAIMON_DEFAULT_OPTIONS.toMap().keySet() to 
obtain the keys.
   ```suggestion
           keys.addAll(PAIMON_DEFAULT_OPTIONS.toMap().keySet());
   ```



##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java:
##########
@@ -104,4 +135,111 @@ public static List<SchemaChange> toPaimonSchemaChanges(
 
         return schemaChanges;
     }
+
+    public static Schema toPaimonSchema(TableDescriptor tableDescriptor) {
+        // validate paimon options first
+        validatePaimonOptions(tableDescriptor.getProperties());
+        validatePaimonOptions(tableDescriptor.getCustomProperties());
+
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        Options options = new Options();
+
+        // set default properties
+        setPaimonDefaultProperties(options);
+
+        // When bucket key is undefined, it should use dynamic bucket (bucket 
= -1) mode.
+        List<String> bucketKeys = tableDescriptor.getBucketKeys();
+        if (!bucketKeys.isEmpty()) {
+            int numBuckets =
+                    tableDescriptor
+                            .getTableDistribution()
+                            
.flatMap(TableDescriptor.TableDistribution::getBucketCount)
+                            .orElseThrow(
+                                    () ->
+                                            new IllegalArgumentException(
+                                                    "Bucket count should be 
set."));
+            options.set(CoreOptions.BUCKET, numBuckets);
+            options.set(CoreOptions.BUCKET_KEY, String.join(",", bucketKeys));
+        } else {
+            options.set(CoreOptions.BUCKET, CoreOptions.BUCKET.defaultValue());
+        }
+
+        // set schema
+        for (org.apache.fluss.metadata.Schema.Column column :
+                tableDescriptor.getSchema().getColumns()) {
+            String columnName = column.getName();
+            if (SYSTEM_COLUMNS.containsKey(columnName)) {
+                throw new InvalidTableException(
+                        "Column "
+                                + columnName
+                                + " conflicts with a system column name of 
paimon table, please rename the column.");
+            }
+            schemaBuilder.column(
+                    columnName,
+                    
column.getDataType().accept(FlussDataTypeToPaimonDataType.INSTANCE),
+                    column.getComment().orElse(null));
+        }
+
+        // add system metadata columns to schema
+        for (Map.Entry<String, DataType> systemColumn : 
SYSTEM_COLUMNS.entrySet()) {
+            schemaBuilder.column(systemColumn.getKey(), 
systemColumn.getValue());
+        }
+
+        // set pk
+        if (tableDescriptor.hasPrimaryKey()) {
+            schemaBuilder.primaryKey(
+                    
tableDescriptor.getSchema().getPrimaryKey().get().getColumnNames());
+            options.set(
+                    CoreOptions.CHANGELOG_PRODUCER.key(),
+                    CoreOptions.ChangelogProducer.INPUT.toString());
+        }
+        // set partition keys
+        schemaBuilder.partitionKeys(tableDescriptor.getPartitionKeys());
+
+        // set properties to paimon schema
+        tableDescriptor.getProperties().forEach((k, v) -> 
setFlussPropertyToPaimon(k, v, options));
+        tableDescriptor
+                .getCustomProperties()
+                .forEach((k, v) -> setFlussPropertyToPaimon(k, v, options));
+        schemaBuilder.options(options.toMap());
+        return schemaBuilder.build();
+    }
+
+    private static void validatePaimonOptions(Map<String, String> properties) {
+        properties.forEach(
+                (k, v) -> {
+                    String paimonKey = k;
+                    if (k.startsWith(PAIMON_CONF_PREFIX)) {
+                        paimonKey = k.substring(PAIMON_CONF_PREFIX.length());
+                    }
+                    if (PAIMON_UNSETTABLE_OPTIONS.contains(paimonKey)
+                            || 
PAIMON_DEFAULT_OPTIONS.toMap().containsKey(paimonKey)) {
+                        throw new InvalidConfigException(
+                                String.format(
+                                        "The Paimon option %s will be set 
automatically by Fluss "
+                                                + "and should not set 
manually.",

Review Comment:
   Grammar nit: change 'should not set manually' to 'should not be set 
manually' for clarity.
   ```suggestion
                                                   + "and should not be set 
manually.",
   ```



##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java:
##########
@@ -17,26 +17,58 @@
 
 package org.apache.fluss.lake.paimon.utils;
 
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.exception.InvalidConfigException;
+import org.apache.fluss.exception.InvalidTableException;
+import org.apache.fluss.lake.paimon.FlussDataTypeToPaimonDataType;
 import org.apache.fluss.lake.paimon.source.FlussRowAsPaimonRow;
 import org.apache.fluss.metadata.TableChange;
+import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.record.ChangeType;
 import org.apache.fluss.row.GenericRow;
 import org.apache.fluss.row.InternalRow;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
-import java.util.function.Function;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.fluss.lake.paimon.PaimonLakeCatalog.SYSTEM_COLUMNS;

Review Comment:
   PaimonConversions now statically depends on PaimonLakeCatalog via 
SYSTEM_COLUMNS while PaimonLakeCatalog depends on PaimonConversions, creating 
tight coupling. Consider moving SYSTEM_COLUMNS to a dedicated constants/util 
class to avoid circular dependencies and improve modularity.
   ```suggestion
   import static 
org.apache.fluss.lake.paimon.PaimonLakeConstants.SYSTEM_COLUMNS;
   ```



##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java:
##########
@@ -17,26 +17,58 @@
 
 package org.apache.fluss.lake.paimon.utils;
 
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.exception.InvalidConfigException;
+import org.apache.fluss.exception.InvalidTableException;
+import org.apache.fluss.lake.paimon.FlussDataTypeToPaimonDataType;
 import org.apache.fluss.lake.paimon.source.FlussRowAsPaimonRow;
 import org.apache.fluss.metadata.TableChange;
+import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.record.ChangeType;
 import org.apache.fluss.row.GenericRow;
 import org.apache.fluss.row.InternalRow;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
-import java.util.function.Function;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.fluss.lake.paimon.PaimonLakeCatalog.SYSTEM_COLUMNS;
 
 /** Utils for conversion between Paimon and Fluss. */
 public class PaimonConversions {
 
+    // for fluss config
+    private static final String FLUSS_CONF_PREFIX = "fluss.";
+    // for paimon config
+    private static final String PAIMON_CONF_PREFIX = "paimon.";
+
+    /** Paimon config options set by Fluss should not be set by users. */
+    @VisibleForTesting public static final Set<String> 
PAIMON_UNSETTABLE_OPTIONS = new HashSet<>();
+
+    @VisibleForTesting public static final Options PAIMON_DEFAULT_OPTIONS = 
new Options();
+
+    static {
+        PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET.key());
+        PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET_KEY.key());
+        PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.CHANGELOG_PRODUCER.key());
+
+        // set partition.legacy-name to false, otherwise paimon will use 
toString for all types,
+        // which will cause inconsistent partition value for a same binary 
value

Review Comment:
   Improve grammar in the comment: 'for a same binary value' -> 'for the same 
binary value'.
   ```suggestion
           // which will cause inconsistent partition value for the same binary 
value
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to