luoyuxia commented on code in PR #1826:
URL: https://github.com/apache/fluss/pull/1826#discussion_r2438987993
##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java:
##########
@@ -17,26 +17,58 @@
package org.apache.fluss.lake.paimon.utils;
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.exception.InvalidConfigException;
+import org.apache.fluss.exception.InvalidTableException;
+import org.apache.fluss.lake.paimon.FlussDataTypeToPaimonDataType;
import org.apache.fluss.lake.paimon.source.FlussRowAsPaimonRow;
import org.apache.fluss.metadata.TableChange;
+import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.record.ChangeType;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.InternalRow;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
-import java.util.function.Function;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.fluss.lake.paimon.PaimonLakeCatalog.SYSTEM_COLUMNS;
/** Utils for conversion between Paimon and Fluss. */
public class PaimonConversions {
+ // for fluss config
+ private static final String FLUSS_CONF_PREFIX = "fluss.";
+ // for paimon config
+ private static final String PAIMON_CONF_PREFIX = "paimon.";
+
+ /** Paimon config options set by Fluss should not be set by users. */
+ @VisibleForTesting public static final Set<String>
PAIMON_UNSETTABLE_OPTIONS = new HashSet<>();
+
+ @VisibleForTesting public static final Options PAIMON_DEFAULT_OPTIONS =
new Options();
+
+ static {
+ PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET.key());
+ PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET_KEY.key());
+ PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.CHANGELOG_PRODUCER.key());
Review Comment:
I think we can make `CHANGELOG_PRODUCER` to be changed by users in case some
users may don't want fluss generate change log? WDYT?
##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java:
##########
@@ -17,26 +17,58 @@
package org.apache.fluss.lake.paimon.utils;
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.exception.InvalidConfigException;
+import org.apache.fluss.exception.InvalidTableException;
+import org.apache.fluss.lake.paimon.FlussDataTypeToPaimonDataType;
import org.apache.fluss.lake.paimon.source.FlussRowAsPaimonRow;
import org.apache.fluss.metadata.TableChange;
+import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.record.ChangeType;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.InternalRow;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
-import java.util.function.Function;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.fluss.lake.paimon.PaimonLakeCatalog.SYSTEM_COLUMNS;
/** Utils for conversion between Paimon and Fluss. */
public class PaimonConversions {
+ // for fluss config
+ private static final String FLUSS_CONF_PREFIX = "fluss.";
+ // for paimon config
+ private static final String PAIMON_CONF_PREFIX = "paimon.";
+
+ /** Paimon config options set by Fluss should not be set by users. */
+ @VisibleForTesting public static final Set<String>
PAIMON_UNSETTABLE_OPTIONS = new HashSet<>();
+
+ @VisibleForTesting public static final Options PAIMON_DEFAULT_OPTIONS =
new Options();
Review Comment:
I suggest to remove PAIMON_DEFAULT_OPTIONS, put
`PARTITION_GENERATE_LEGCY_NAME` into `PAIMON_UNSETTABLE_OPTIONS`.
And hard code
```
options.set(CoreOptions.PARTITION_GENERATE_LEGCY_NAME, false);
```
direclty. Introduce two options make me hard to figure out what that mean.
##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java:
##########
@@ -104,4 +135,111 @@ public static List<SchemaChange> toPaimonSchemaChanges(
return schemaChanges;
}
+
+ public static Schema toPaimonSchema(TableDescriptor tableDescriptor) {
+ // validate paimon options first
+ validatePaimonOptions(tableDescriptor.getProperties());
+ validatePaimonOptions(tableDescriptor.getCustomProperties());
+
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ Options options = new Options();
+
+ // set default properties
+ setPaimonDefaultProperties(options);
+
+ // When bucket key is undefined, it should use dynamic bucket (bucket
= -1) mode.
+ List<String> bucketKeys = tableDescriptor.getBucketKeys();
+ if (!bucketKeys.isEmpty()) {
+ int numBuckets =
+ tableDescriptor
+ .getTableDistribution()
+
.flatMap(TableDescriptor.TableDistribution::getBucketCount)
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ "Bucket count should be
set."));
+ options.set(CoreOptions.BUCKET, numBuckets);
+ options.set(CoreOptions.BUCKET_KEY, String.join(",", bucketKeys));
+ } else {
+ options.set(CoreOptions.BUCKET, CoreOptions.BUCKET.defaultValue());
+ }
+
+ // set schema
+ for (org.apache.fluss.metadata.Schema.Column column :
+ tableDescriptor.getSchema().getColumns()) {
+ String columnName = column.getName();
+ if (SYSTEM_COLUMNS.containsKey(columnName)) {
+ throw new InvalidTableException(
+ "Column "
+ + columnName
+ + " conflicts with a system column name of
paimon table, please rename the column.");
+ }
+ schemaBuilder.column(
+ columnName,
+
column.getDataType().accept(FlussDataTypeToPaimonDataType.INSTANCE),
+ column.getComment().orElse(null));
+ }
+
+ // add system metadata columns to schema
+ for (Map.Entry<String, DataType> systemColumn :
SYSTEM_COLUMNS.entrySet()) {
+ schemaBuilder.column(systemColumn.getKey(),
systemColumn.getValue());
+ }
+
+ // set pk
+ if (tableDescriptor.hasPrimaryKey()) {
+ schemaBuilder.primaryKey(
+
tableDescriptor.getSchema().getPrimaryKey().get().getColumnNames());
+ options.set(
+ CoreOptions.CHANGELOG_PRODUCER.key(),
+ CoreOptions.ChangelogProducer.INPUT.toString());
+ }
+ // set partition keys
+ schemaBuilder.partitionKeys(tableDescriptor.getPartitionKeys());
+
+ // set properties to paimon schema
+ tableDescriptor.getProperties().forEach((k, v) ->
setFlussPropertyToPaimon(k, v, options));
+ tableDescriptor
+ .getCustomProperties()
+ .forEach((k, v) -> setFlussPropertyToPaimon(k, v, options));
+ schemaBuilder.options(options.toMap());
+ return schemaBuilder.build();
+ }
+
+ private static void validatePaimonOptions(Map<String, String> properties) {
+ properties.forEach(
+ (k, v) -> {
+ String paimonKey = k;
+ if (k.startsWith(PAIMON_CONF_PREFIX)) {
+ paimonKey = k.substring(PAIMON_CONF_PREFIX.length());
+ }
+ if (PAIMON_UNSETTABLE_OPTIONS.contains(paimonKey)
+ ||
PAIMON_DEFAULT_OPTIONS.toMap().containsKey(paimonKey)) {
+ throw new InvalidConfigException(
+ String.format(
+ "The Paimon option %s will be set
automatically by Fluss "
+ + "and should not set
manually.",
+ k));
+ }
+ });
+ }
+
+ private static void setPaimonDefaultProperties(Options options) {
Review Comment:
nit:
`convertFlussPropertyKeyToPaimon`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]