Aireed commented on code in PR #3428:
URL: https://github.com/apache/amoro/pull/3428#discussion_r1990727544


##########
amoro-format-iceberg/src/main/java/org/apache/amoro/mixed/InternalMixedIcebergCatalog.java:
##########
@@ -121,6 +115,13 @@ protected Table createChangeStore(
       return tableMetaStore.doAs(() -> 
icebergCatalog.loadTable(changeIdentifier));
     }
 
+    @Override
+    protected TableIdentifier generateChangeStoreIdentifier(TableIdentifier 
baseIdentifier) {
+      return TableIdentifier.of(
+          baseIdentifier.namespace(),
+          baseIdentifier.name() + CHANGE_STORE_SEPARATOR + 
MixedTable.CHANGE_STORE_IDENTIFIER);

Review Comment:
   should  we replace `CHANGE_STORE_SEPARATOR`  to   `this.separtor` 
   ```suggestion
             baseIdentifier.name() + CHANGE_STORE_SEPARATOR + 
MixedTable.CHANGE_STORE_IDENTIFIER);
   ```



##########
amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/java/org/apache/amoro/spark/command/MigrateToMixedFormatCommand.java:
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.spark.command;
+
+import org.apache.amoro.shade.guava32.com.google.common.base.Joiner;
+import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
+import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
+import org.apache.amoro.spark.MixedFormatSparkCatalog;
+import org.apache.amoro.spark.MixedFormatSparkSessionCatalog;
+import org.apache.amoro.spark.table.MixedSparkTable;
+import org.apache.amoro.spark.table.UnkeyedSparkTable;
+import org.apache.amoro.spark.util.MixedFormatSparkUtils;
+import org.apache.amoro.table.UnkeyedTable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.TableMigrationUtil;
+import org.apache.iceberg.hadoop.Util;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkTableUtil;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.catalyst.catalog.CatalogTable;
+import org.apache.spark.sql.connector.catalog.CatalogManager;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.V1Table;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Some;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * migrate a v1 table to mixed-format table. will reuse file in v1 table , but 
delete metadata in
+ * session catalog
+ */
+public class MigrateToMixedFormatCommand implements MixedFormatSparkCommand {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MigrateToMixedFormatCommand.class);
+
+  private static final String V1TABLE_BACKUP_SUFFIX = "_BAK_MIXED_FORMAT_";
+  protected static final List<String> EXCLUDED_PROPERTIES =
+      ImmutableList.of("path", "transient_lastDdlTime", 
"serialization.format");
+
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField("partition", DataTypes.StringType, false, 
Metadata.empty()),
+            new StructField("file_counts", DataTypes.IntegerType, false, 
Metadata.empty())
+          });
+
+  private final SparkSession spark;
+  private final TableCatalog sourceCatalog;
+  private final Identifier sourceIdentifier;
+  private final Identifier backupV1TableIdentifier;
+  private final TableCatalog targetCatalog;
+  private final Identifier targetIdentifier;
+
+  protected MigrateToMixedFormatCommand(
+      TableCatalog sourceCatalog,
+      Identifier sourceIdentifier,
+      TableCatalog catalog,
+      Identifier identifier,
+      SparkSession spark) {
+    this.spark = spark;
+    this.sourceCatalog = sourceCatalog;
+    this.targetCatalog = catalog;
+    this.targetIdentifier = identifier;
+    this.sourceIdentifier = sourceIdentifier;
+    String backupName = sourceIdentifier.name();
+    backupV1TableIdentifier = Identifier.of(sourceIdentifier.namespace(), 
backupName);
+  }
+
+  @Override
+  public String name() {
+    return "MigrateToMixedFormat";
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public Row[] execute() throws AnalysisException {
+    List<DataFile> dataFiles;
+    TableIdentifier ident;
+    PartitionSpec spec;
+    Schema schema;
+    LOG.info(
+        "start to migrate {} to {}, using temp backup table {}",
+        sourceIdentifier,
+        targetIdentifier,
+        backupV1TableIdentifier);
+    V1Table sourceTable = loadV1Table(sourceCatalog, backupV1TableIdentifier);
+    ident =
+        new TableIdentifier(
+            backupV1TableIdentifier.name(), 
Some.apply(backupV1TableIdentifier.namespace()[0]));
+    dataFiles = loadDataFiles(ident);
+    UnkeyedTable table = createUnkeyedTable(sourceTable);
+
+    spec = table.spec();
+
+    AppendFiles appendFiles = table.newAppend();
+    dataFiles.forEach(appendFiles::appendFile);
+    appendFiles.commit();
+
+    LOG.info(
+        "migrate table {} finished, remove metadata of backup {} table",
+        targetIdentifier,
+        backupV1TableIdentifier);
+
+    if (PartitionSpec.unpartitioned().equals(spec)) {
+      return new Row[] {RowFactory.create("ALL", dataFiles.size())};
+    }
+
+    Map<String, List<DataFile>> partitions = Maps.newHashMap();
+    dataFiles.forEach(
+        d -> {
+          String partition = spec.partitionToPath(d.partition());
+          List<DataFile> df = partitions.computeIfAbsent(partition, p -> 
Lists.newArrayList());
+          df.add(d);
+        });
+    return partitions.keySet().stream()
+        .sorted()
+        .map(p -> RowFactory.create(p, partitions.get(p).size()))
+        .toArray(Row[]::new);
+  }
+
+  private List<DataFile> loadDataFiles(TableIdentifier ident) throws 
AnalysisException {
+    PartitionSpec spec =
+        SparkSchemaUtil.specForTable(spark, ident.database().get() + "." + 
ident.table());
+
+    if (spec.equals(PartitionSpec.unpartitioned())) {
+      return listUnPartitionedSparkTable(spark, ident);
+    } else {
+      List<SparkTableUtil.SparkPartition> sparkPartitions =
+          SparkTableUtil.getPartitions(spark, ident, Maps.newHashMap());
+      Preconditions.checkArgument(
+          !sparkPartitions.isEmpty(), "Cannot find any partitions in table 
%s", ident);
+      return listPartitionDataFiles(spark, sparkPartitions, spec);
+    }
+  }
+
+  private UnkeyedTable createUnkeyedTable(V1Table sourceTable)
+      throws TableAlreadyExistsException, NoSuchNamespaceException {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.putAll(sourceTable.properties());
+    EXCLUDED_PROPERTIES.forEach(properties::remove);
+    properties.put(TableCatalog.PROP_PROVIDER, "arctic");

Review Comment:
   ```suggestion
       properties.put(TableCatalog.PROP_PROVIDER, "mixed_hive");
   ```



##########
amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/java/org/apache/amoro/spark/command/MigrateToMixedFormatCommand.java:
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.spark.command;
+
+import org.apache.amoro.shade.guava32.com.google.common.base.Joiner;
+import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
+import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
+import org.apache.amoro.spark.MixedFormatSparkCatalog;
+import org.apache.amoro.spark.MixedFormatSparkSessionCatalog;
+import org.apache.amoro.spark.table.MixedSparkTable;
+import org.apache.amoro.spark.table.UnkeyedSparkTable;
+import org.apache.amoro.spark.util.MixedFormatSparkUtils;
+import org.apache.amoro.table.UnkeyedTable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.TableMigrationUtil;
+import org.apache.iceberg.hadoop.Util;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkTableUtil;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.catalyst.catalog.CatalogTable;
+import org.apache.spark.sql.connector.catalog.CatalogManager;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.V1Table;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Some;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * migrate a v1 table to mixed-format table. will reuse file in v1 table , but 
delete metadata in
+ * session catalog
+ */
+public class MigrateToMixedFormatCommand implements MixedFormatSparkCommand {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MigrateToMixedFormatCommand.class);
+
+  private static final String V1TABLE_BACKUP_SUFFIX = "_BAK_MIXED_FORMAT_";
+  protected static final List<String> EXCLUDED_PROPERTIES =
+      ImmutableList.of("path", "transient_lastDdlTime", 
"serialization.format");
+
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField("partition", DataTypes.StringType, false, 
Metadata.empty()),
+            new StructField("file_counts", DataTypes.IntegerType, false, 
Metadata.empty())
+          });
+
+  private final SparkSession spark;
+  private final TableCatalog sourceCatalog;
+  private final Identifier sourceIdentifier;
+  private final Identifier backupV1TableIdentifier;
+  private final TableCatalog targetCatalog;
+  private final Identifier targetIdentifier;
+
+  protected MigrateToMixedFormatCommand(
+      TableCatalog sourceCatalog,
+      Identifier sourceIdentifier,
+      TableCatalog catalog,
+      Identifier identifier,
+      SparkSession spark) {
+    this.spark = spark;
+    this.sourceCatalog = sourceCatalog;
+    this.targetCatalog = catalog;
+    this.targetIdentifier = identifier;
+    this.sourceIdentifier = sourceIdentifier;
+    String backupName = sourceIdentifier.name();
+    backupV1TableIdentifier = Identifier.of(sourceIdentifier.namespace(), 
backupName);
+  }
+
+  @Override
+  public String name() {
+    return "MigrateToMixedFormat";
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public Row[] execute() throws AnalysisException {
+    List<DataFile> dataFiles;
+    TableIdentifier ident;
+    PartitionSpec spec;
+    Schema schema;
+    LOG.info(
+        "start to migrate {} to {}, using temp backup table {}",
+        sourceIdentifier,
+        targetIdentifier,
+        backupV1TableIdentifier);
+    V1Table sourceTable = loadV1Table(sourceCatalog, backupV1TableIdentifier);
+    ident =
+        new TableIdentifier(
+            backupV1TableIdentifier.name(), 
Some.apply(backupV1TableIdentifier.namespace()[0]));
+    dataFiles = loadDataFiles(ident);
+    UnkeyedTable table = createUnkeyedTable(sourceTable);
+
+    spec = table.spec();
+
+    AppendFiles appendFiles = table.newAppend();
+    dataFiles.forEach(appendFiles::appendFile);
+    appendFiles.commit();
+
+    LOG.info(
+        "migrate table {} finished, remove metadata of backup {} table",
+        targetIdentifier,
+        backupV1TableIdentifier);
+
+    if (PartitionSpec.unpartitioned().equals(spec)) {
+      return new Row[] {RowFactory.create("ALL", dataFiles.size())};
+    }
+
+    Map<String, List<DataFile>> partitions = Maps.newHashMap();
+    dataFiles.forEach(
+        d -> {
+          String partition = spec.partitionToPath(d.partition());
+          List<DataFile> df = partitions.computeIfAbsent(partition, p -> 
Lists.newArrayList());
+          df.add(d);
+        });
+    return partitions.keySet().stream()
+        .sorted()
+        .map(p -> RowFactory.create(p, partitions.get(p).size()))
+        .toArray(Row[]::new);
+  }
+
+  private List<DataFile> loadDataFiles(TableIdentifier ident) throws 
AnalysisException {
+    PartitionSpec spec =
+        SparkSchemaUtil.specForTable(spark, ident.database().get() + "." + 
ident.table());
+
+    if (spec.equals(PartitionSpec.unpartitioned())) {
+      return listUnPartitionedSparkTable(spark, ident);
+    } else {
+      List<SparkTableUtil.SparkPartition> sparkPartitions =
+          SparkTableUtil.getPartitions(spark, ident, Maps.newHashMap());
+      Preconditions.checkArgument(
+          !sparkPartitions.isEmpty(), "Cannot find any partitions in table 
%s", ident);
+      return listPartitionDataFiles(spark, sparkPartitions, spec);
+    }
+  }
+
+  private UnkeyedTable createUnkeyedTable(V1Table sourceTable)
+      throws TableAlreadyExistsException, NoSuchNamespaceException {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.putAll(sourceTable.properties());
+    EXCLUDED_PROPERTIES.forEach(properties::remove);
+    properties.put(TableCatalog.PROP_PROVIDER, "arctic");
+    properties.put("migrated", "true");

Review Comment:
   Add a prefix to indicate that this parameter is used by amoro.
   
   
   ```suggestion
       properties.put("amoro.migrated", "true");
   ```



##########
amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/java/org/apache/amoro/spark/MixedFormatSparkCatalog.java:
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.spark;
+
+import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
+import org.apache.amoro.spark.mixed.MixedSparkCatalogBase;
+import org.apache.amoro.spark.mixed.MixedTableStoreType;
+import org.apache.amoro.spark.table.MixedSparkTable;
+import org.apache.amoro.spark.table.SparkChangeTable;
+import org.apache.amoro.table.BasicUnkeyedTable;
+import org.apache.amoro.table.KeyedTable;
+import org.apache.amoro.table.MixedTable;
+import org.apache.amoro.table.PrimaryKeySpec;
+import org.apache.amoro.table.TableBuilder;
+import org.apache.amoro.table.TableIdentifier;
+import org.apache.amoro.table.UnkeyedTable;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.catalog.TableChange.ColumnChange;
+import org.apache.spark.sql.connector.catalog.TableChange.RemoveProperty;
+import org.apache.spark.sql.connector.catalog.TableChange.SetProperty;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.StructType;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class MixedFormatSparkCatalog extends MixedSparkCatalogBase implements 
SupportsFunctions {
+
+  @Override
+  public Table loadTable(Identifier ident) throws NoSuchTableException {
+    checkAndRefreshCatalogMeta();
+    TableIdentifier identifier;
+    MixedTable table;
+    try {
+      if (isInnerTableIdentifier(ident)) {
+        MixedTableStoreType type = MixedTableStoreType.from(ident.name());
+        identifier = buildInnerTableIdentifier(ident);
+        table = catalog.loadTable(identifier);
+        return loadInnerTable(table, type);
+      } else {
+        identifier = buildIdentifier(ident);
+        table = catalog.loadTable(identifier);
+      }
+    } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+      throw new NoSuchTableException(ident);
+    }
+    return MixedSparkTable.ofMixedTable(table, catalog, name());
+  }
+
+  private Table loadInnerTable(MixedTable table, MixedTableStoreType type) {
+    if (type != null) {
+      switch (type) {
+        case CHANGE:
+          return new SparkChangeTable(
+              (BasicUnkeyedTable) table.asKeyedTable().changeTable(), false);
+        default:
+          throw new IllegalArgumentException("Unknown inner table type: " + 
type);
+      }
+    } else {
+      throw new IllegalArgumentException("Table does not exist: " + table);
+    }
+  }
+
+  @Override
+  public Table createTable(
+      Identifier ident, StructType schema, Transform[] transforms, Map<String, 
String> properties)
+      throws TableAlreadyExistsException {
+    checkAndRefreshCatalogMeta();
+    properties = Maps.newHashMap(properties);
+    Schema finalSchema = checkAndConvertSchema(schema, properties);
+    TableIdentifier identifier = buildIdentifier(ident);
+    TableBuilder builder = catalog.newTableBuilder(identifier, finalSchema);
+    PartitionSpec spec = Spark3Util.toPartitionSpec(finalSchema, transforms);
+    if (properties.containsKey(TableCatalog.PROP_LOCATION)
+        && isIdentifierLocation(properties.get(TableCatalog.PROP_LOCATION), 
ident)) {
+      properties.remove(TableCatalog.PROP_LOCATION);
+    }
+    try {
+      if (properties.containsKey("primary.keys")) {

Review Comment:
   should we add a definition in  TableProperties.class  for "primary.keys" ?
   



##########
amoro-format-iceberg/src/main/java/org/apache/amoro/mixed/InternalMixedIcebergCatalog.java:
##########
@@ -43,7 +44,6 @@
 public class InternalMixedIcebergCatalog extends BasicMixedIcebergCatalog {
 
   public static final String CHANGE_STORE_SEPARATOR = "@";

Review Comment:
   `@` is not validate for  table name in hive
   ```suggestion
     public static final String CHANGE_STORE_SEPARATOR = "@";
   ```



##########
amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/java/org/apache/amoro/spark/MixedFormatSparkCatalog.java:
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.spark;
+
+import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
+import org.apache.amoro.spark.mixed.MixedSparkCatalogBase;
+import org.apache.amoro.spark.mixed.MixedTableStoreType;
+import org.apache.amoro.spark.table.MixedSparkTable;
+import org.apache.amoro.spark.table.SparkChangeTable;
+import org.apache.amoro.table.BasicUnkeyedTable;
+import org.apache.amoro.table.KeyedTable;
+import org.apache.amoro.table.MixedTable;
+import org.apache.amoro.table.PrimaryKeySpec;
+import org.apache.amoro.table.TableBuilder;
+import org.apache.amoro.table.TableIdentifier;
+import org.apache.amoro.table.UnkeyedTable;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.catalog.TableChange.ColumnChange;
+import org.apache.spark.sql.connector.catalog.TableChange.RemoveProperty;
+import org.apache.spark.sql.connector.catalog.TableChange.SetProperty;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.StructType;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class MixedFormatSparkCatalog extends MixedSparkCatalogBase implements 
SupportsFunctions {
+
+  @Override
+  public Table loadTable(Identifier ident) throws NoSuchTableException {
+    checkAndRefreshCatalogMeta();
+    TableIdentifier identifier;
+    MixedTable table;
+    try {
+      if (isInnerTableIdentifier(ident)) {
+        MixedTableStoreType type = MixedTableStoreType.from(ident.name());
+        identifier = buildInnerTableIdentifier(ident);
+        table = catalog.loadTable(identifier);
+        return loadInnerTable(table, type);
+      } else {
+        identifier = buildIdentifier(ident);
+        table = catalog.loadTable(identifier);
+      }
+    } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+      throw new NoSuchTableException(ident);
+    }
+    return MixedSparkTable.ofMixedTable(table, catalog, name());
+  }
+
+  private Table loadInnerTable(MixedTable table, MixedTableStoreType type) {
+    if (type != null) {
+      switch (type) {
+        case CHANGE:
+          return new SparkChangeTable(
+              (BasicUnkeyedTable) table.asKeyedTable().changeTable(), false);
+        default:
+          throw new IllegalArgumentException("Unknown inner table type: " + 
type);
+      }
+    } else {
+      throw new IllegalArgumentException("Table does not exist: " + table);
+    }
+  }
+
+  @Override
+  public Table createTable(
+      Identifier ident, StructType schema, Transform[] transforms, Map<String, 
String> properties)
+      throws TableAlreadyExistsException {
+    checkAndRefreshCatalogMeta();
+    properties = Maps.newHashMap(properties);
+    Schema finalSchema = checkAndConvertSchema(schema, properties);
+    TableIdentifier identifier = buildIdentifier(ident);
+    TableBuilder builder = catalog.newTableBuilder(identifier, finalSchema);
+    PartitionSpec spec = Spark3Util.toPartitionSpec(finalSchema, transforms);
+    if (properties.containsKey(TableCatalog.PROP_LOCATION)
+        && isIdentifierLocation(properties.get(TableCatalog.PROP_LOCATION), 
ident)) {
+      properties.remove(TableCatalog.PROP_LOCATION);
+    }
+    try {
+      if (properties.containsKey("primary.keys")) {
+        PrimaryKeySpec primaryKeySpec =
+            PrimaryKeySpec.fromDescription(finalSchema, 
properties.get("primary.keys"));
+        properties.remove("primary.keys");
+        builder
+            .withPartitionSpec(spec)
+            .withProperties(properties)
+            .withPrimaryKeySpec(primaryKeySpec);
+      } else {
+        builder.withPartitionSpec(spec).withProperties(properties);
+      }
+      MixedTable table = builder.create();
+      return MixedSparkTable.ofMixedTable(table, catalog, name());
+    } catch (AlreadyExistsException e) {
+      throw new TableAlreadyExistsException("Table " + ident + " already 
exists", Option.apply(e));
+    }
+  }
+
+  private Schema checkAndConvertSchema(StructType schema, Map<String, String> 
properties) {
+    Schema convertSchema;
+    convertSchema = SparkSchemaUtil.convert(schema);
+
+    // schema add primary keys
+    if (properties.containsKey("primary.keys")) {
+      PrimaryKeySpec primaryKeySpec =
+          PrimaryKeySpec.fromDescription(convertSchema, 
properties.get("primary.keys"));
+      List<String> primaryKeys = primaryKeySpec.fieldNames();
+      Set<String> pkSet = new HashSet<>(primaryKeys);
+      Set<Integer> identifierFieldIds = new HashSet<>();
+      List<Types.NestedField> columnsWithPk = new ArrayList<>();
+      convertSchema
+          .columns()
+          .forEach(
+              nestedField -> {
+                if (pkSet.contains(nestedField.name())) {
+                  columnsWithPk.add(nestedField.asRequired());
+                  identifierFieldIds.add(nestedField.fieldId());
+                } else {
+                  columnsWithPk.add(nestedField);
+                }
+              });
+      return new Schema(columnsWithPk, identifierFieldIds);
+    }
+    return convertSchema;
+  }
+
+  @Override
+  public Table alterTable(Identifier ident, TableChange... changes) throws 
NoSuchTableException {
+    TableIdentifier identifier = buildIdentifier(ident);
+    MixedTable table;
+    try {
+      table = catalog.loadTable(identifier);
+    } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+      throw new NoSuchTableException(ident);
+    }
+    if (table.isUnkeyedTable()) {
+      alterUnKeyedTable(table.asUnkeyedTable(), changes);
+      return MixedSparkTable.ofMixedTable(table, catalog, name());
+    } else if (table.isKeyedTable()) {
+      alterKeyedTable(table.asKeyedTable(), changes);
+      return MixedSparkTable.ofMixedTable(table, catalog, name());
+    }
+    throw new UnsupportedOperationException("Unsupported alter table");
+  }
+
+  private void alterKeyedTable(KeyedTable table, TableChange... changes) {
+    List<TableChange> schemaChanges = new ArrayList<>();
+    List<TableChange> propertyChanges = new ArrayList<>();
+    for (TableChange change : changes) {
+      if (change instanceof ColumnChange) {
+        schemaChanges.add(change);
+      } else if (change instanceof SetProperty) {
+        propertyChanges.add(change);
+      } else if (change instanceof RemoveProperty) {
+        propertyChanges.add(change);
+      } else {
+        throw new UnsupportedOperationException("Cannot apply unknown table 
change: " + change);
+      }
+    }
+    commitKeyedChanges(table, schemaChanges, propertyChanges);
+  }
+
+  private void commitKeyedChanges(
+      KeyedTable table, List<TableChange> schemaChanges, List<TableChange> 
propertyChanges) {
+    if (!schemaChanges.isEmpty()) {
+      Spark3Util.applySchemaChanges(table.updateSchema(), 
schemaChanges).commit();
+    }
+
+    if (!propertyChanges.isEmpty()) {
+      Spark3Util.applyPropertyChanges(table.updateProperties(), 
propertyChanges).commit();
+    }
+  }
+
+  private void alterUnKeyedTable(UnkeyedTable table, TableChange... changes) {
+    SetProperty setLocation = null;
+    SetProperty setSnapshotId = null;
+    SetProperty pickSnapshotId = null;
+    List<TableChange> propertyChanges = new ArrayList<>();
+    List<TableChange> schemaChanges = new ArrayList<>();
+
+    for (TableChange change : changes) {
+      if (change instanceof SetProperty) {
+        TableChange.SetProperty set = (TableChange.SetProperty) change;
+        if (TableCatalog.PROP_LOCATION.equalsIgnoreCase(set.property())) {
+          setLocation = set;
+        } else if ("current-snapshot-id".equalsIgnoreCase(set.property())) {
+          setSnapshotId = set;
+        } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(set.property())) 
{
+          pickSnapshotId = set;
+        } else if ("sort-order".equalsIgnoreCase(set.property())) {
+          throw new UnsupportedOperationException(
+              "Cannot specify the 'sort-order' because it's a reserved table "
+                  + "property. Please use the command 'ALTER TABLE ... WRITE 
ORDERED BY' to specify write sort-orders.");
+        } else {
+          propertyChanges.add(set);
+        }
+      } else if (change instanceof RemoveProperty) {
+        propertyChanges.add(change);
+      } else if (change instanceof ColumnChange) {
+        schemaChanges.add(change);
+      } else {
+        throw new UnsupportedOperationException("Cannot apply unknown table 
change: " + change);
+      }
+    }
+
+    commitUnKeyedChanges(
+        table, setLocation, setSnapshotId, pickSnapshotId, propertyChanges, 
schemaChanges);
+  }
+
+  protected void commitUnKeyedChanges(
+      UnkeyedTable table,
+      SetProperty setLocation,
+      SetProperty setSnapshotId,
+      SetProperty pickSnapshotId,
+      List<TableChange> propertyChanges,
+      List<TableChange> schemaChanges) {
+    // don't allow setting the snapshot and picking a commit at the same time 
because order is
+    // ambiguous and choosing
+    // one order leads to different results
+    Preconditions.checkArgument(
+        setSnapshotId == null || pickSnapshotId == null,
+        "Cannot set the current the current snapshot ID and cherry-pick 
snapshot changes");

Review Comment:
   ```suggestion
           "Cannot set the current snapshot ID and cherry-pick snapshot changes 
at the same time");
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to