wwj6591812 commented on code in PR #5078:
URL: https://github.com/apache/paimon/pull/5078#discussion_r1954527154


##########
paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java:
##########
@@ -83,6 +83,9 @@ public static Content fromId(int id) {
     private final InternalMap lowerBounds;
     private final InternalMap upperBounds;
 
+    // only used for iceberg migrate

Review Comment:
   I think it is better : "only used for migrate iceberg table to paimon with 
schema evolution".



##########
paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java:
##########
@@ -246,10 +277,21 @@ public void renameTable(boolean ignoreIfNotExists) throws 
Exception {
         paimonCatalog.renameTable(targetTableId, sourceTableId, 
ignoreIfNotExists);
     }
 
-    public Schema icebergSchemaToPaimonSchema(IcebergMetadata icebergMetadata) 
{
-        // get iceberg current schema
-        IcebergSchema icebergSchema =
-                
icebergMetadata.schemas().get(icebergMetadata.currentSchemaId());
+    public List<TableSchema> icebergSchemasToPaimonSchemas(IcebergMetadata 
icebergMetadata) {

Review Comment:
   private



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableAction.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.flink.procedure.MigrateIcebergTableProcedure;
+
+import org.apache.flink.table.procedure.DefaultProcedureContext;
+
+import java.util.Map;
+
+/** Migrate from iceberg table to paimon table. */
+public class MigrateIcebergTableAction extends ActionBase {
+
+    private final String sourceTableFullName;
+    private final String tableProperties;
+    private final Integer parallelism;
+
+    private final String icebergProperties;
+
+    public MigrateIcebergTableAction(
+            String sourceTableFullName,
+            Map<String, String> catalogConfig,
+            String icebergProperties,
+            String tableProperties,
+            Integer parallelism) {

Review Comment:
   add @Nullable before parallelism



##########
paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadataFactory.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.migrate;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.iceberg.IcebergOptions;
+import org.apache.paimon.options.Options;
+
+/** Factory to create {@link IcebergMigrateHiveMetadata}. */
+public class IcebergMigrateHiveMetadataFactory implements 
IcebergMigrateMetadataFactory {
+    @Override
+    public String identifier() {
+        return IcebergOptions.StorageType.HIVE_CATALOG.toString() + "_migrate";

Review Comment:
   no need call toString()



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableActionFactory.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Action Factory for {@link MigrateIcebergTableAction}. */
+public class MigrateIcebergTableActionFactory implements ActionFactory {
+
+    public static final String IDENTIFIER = "migrate_iceberg_table";

Review Comment:
   private



##########
paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java:
##########
@@ -78,24 +67,25 @@ public String[] call(
             String properties,
             Integer parallelism)
             throws Exception {
-        String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;
+        String targetTablePath = sourceTablePath + PAIMON_SUFFIX;
 
         Identifier sourceTableId = Identifier.fromString(sourceTablePath);
-        Identifier targetTableId = 
Identifier.fromString(targetPaimonTablePath);
+        Identifier targetTableId = Identifier.fromString(targetTablePath);
 
-        TableMigrationUtils.getImporter(
+        Migrator migrator =
+                TableMigrationUtils.getImporter(
                         connector,
                         catalog,
                         sourceTableId.getDatabaseName(),
                         sourceTableId.getObjectName(),
                         targetTableId.getDatabaseName(),
                         targetTableId.getObjectName(),
                         parallelism,
-                        
ParameterUtils.parseCommaSeparatedKeyValues(properties))
-                .executeMigrate();
+                        
ParameterUtils.parseCommaSeparatedKeyValues(properties));
+        LOG.info("create migrator success.");

Review Comment:
   I think it is better : "create migrator success and begin executeMigrate." 



##########
paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java:
##########
@@ -289,6 +331,18 @@ private void 
checkAndFilterDataFiles(List<IcebergDataFileMeta> icebergDataFileMe
         }
     }
 
+    public long getSchemaIdFromIcebergManifestFile(Path manifestPath, FileIO 
fileIO) {

Review Comment:
   private



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java:
##########
@@ -60,6 +62,27 @@ public static Migrator getImporter(
         }
     }
 
+    public static Migrator getIcebergImporter(
+            Catalog catalog,
+            String sourceDatabase,
+            String sourceTableName,
+            String targetDatabase,
+            String targetTableName,
+            Integer parallelism,
+            Map<String, String> options,

Review Comment:
   no need of arg `options`



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java:
##########
@@ -0,0 +1,90 @@
+/*

Review Comment:
   Should we also modify the doc of these action and procedure in this PR?



##########
paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadataFactory.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.migrate;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.iceberg.IcebergOptions;
+import org.apache.paimon.options.Options;
+
+/** Factory to create {@link IcebergMigrateHiveMetadata}. */
+public class IcebergMigrateHiveMetadataFactory implements 
IcebergMigrateMetadataFactory {
+    @Override

Review Comment:
   add a blank line



##########
paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.migrate;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.client.ClientPool;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.hive.pool.CachedClientPool;
+import org.apache.paimon.iceberg.IcebergOptions;
+import org.apache.paimon.iceberg.metadata.IcebergMetadata;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Get iceberg table latest snapshot metadata in hive. */
+public class IcebergMigrateHiveMetadata implements IcebergMigrateMetadata {
+    private static final Logger LOG = 
LoggerFactory.getLogger(IcebergMigrateHiveMetadata.class);
+
+    public static final String TABLE_TYPE_PROP = "table_type";

Review Comment:
   private



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to