kingeasternsun commented on a change in pull request #3876:
URL: https://github.com/apache/iceberg/pull/3876#discussion_r790092338



##########
File path: data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
##########
@@ -74,111 +78,131 @@ private TableMigrationUtil() {
   public static List<DataFile> listPartition(Map<String, String> partition, 
String uri, String format,
                                              PartitionSpec spec, Configuration 
conf, MetricsConfig metricsConfig,
                                              NameMapping mapping) {
-    if (format.contains("avro")) {
-      return listAvroPartition(partition, uri, spec, conf);
-    } else if (format.contains("parquet")) {
-      return listParquetPartition(partition, uri, spec, conf, metricsConfig, 
mapping);
-    } else if (format.contains("orc")) {
-      return listOrcPartition(partition, uri, spec, conf, metricsConfig, 
mapping);
-    } else {
-      throw new UnsupportedOperationException("Unknown partition format: " + 
format);
-    }
+    return listPartition(partition, uri, format, spec, conf, metricsConfig, 
mapping, 1);
   }
 
-  private static List<DataFile> listAvroPartition(Map<String, String> 
partitionPath, String partitionUri,
-                                                  PartitionSpec spec, 
Configuration conf) {
+  public static List<DataFile> listPartition(Map<String, String> 
partitionPath, String partitionUri, String format,
+                                             PartitionSpec spec, Configuration 
conf, MetricsConfig metricsSpec,
+                                             NameMapping mapping, int 
parallelism) {
     try {
       Path partition = new Path(partitionUri);
       FileSystem fs = partition.getFileSystem(conf);
-      return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
-          .filter(FileStatus::isFile)
-          .map(stat -> {
-            InputFile file = 
HadoopInputFile.fromLocation(stat.getPath().toString(), conf);
-            long rowCount = Avro.rowCount(file);
-            Metrics metrics = new Metrics(rowCount, null, null, null, null);
-            String partitionKey = spec.fields().stream()
-                .map(PartitionField::name)
-                .map(name -> String.format("%s=%s", name, 
partitionPath.get(name)))
-                .collect(Collectors.joining("/"));
-
-            return DataFiles.builder(spec)
-                .withPath(stat.getPath().toString())
-                .withFormat("avro")
-                .withFileSizeInBytes(stat.getLen())
-                .withMetrics(metrics)
-                .withPartitionPath(partitionKey)
-                .build();
-
-          }).collect(Collectors.toList());
+      List<FileStatus> fileStatus = Arrays.stream(fs.listStatus(partition, 
HIDDEN_PATH_FILTER))
+              .filter(FileStatus::isFile)
+              .collect(Collectors.toList());
+      DataFile[] datafiles = new DataFile[fileStatus.size()];
+      Tasks.Builder<Integer> task = Tasks.range(fileStatus.size())
+              .stopOnFailure()
+              .throwFailureWhenFinished();
+
+      if (parallelism > 1) {
+        task.executeWith(migrationService(parallelism));
+      }
+      if (format.contains("avro")) {
+        task.run(index -> {
+          datafiles[index] = getAvroDataFile(fileStatus.get(index), 
partitionPath, spec, conf);
+        });
+      } else if (format.contains("parquet")) {
+        task.run(index -> {
+          datafiles[index] = getParquetDataFile(fileStatus.get(index), 
partitionPath, spec, conf, metricsSpec, mapping);
+        });
+      } else if (format.contains("orc")) {
+        task.run(index -> {
+          datafiles[index] = getOrcDataFile(fileStatus.get(index), 
partitionPath, spec, conf, metricsSpec, mapping);
+        });
+      } else {
+        throw new UnsupportedOperationException("Unknown partition format: " + 
format);
+      }
+      return Arrays.asList(datafiles);
     } catch (IOException e) {
       throw new RuntimeException("Unable to list files in partition: " + 
partitionUri, e);
     }
   }
 
-  private static List<DataFile> listParquetPartition(Map<String, String> 
partitionPath, String partitionUri,
+  private static DataFile getAvroDataFile(FileStatus stat, Map<String, String> 
partitionPath,
+                                                  PartitionSpec spec, 
Configuration conf) {
+    Metrics metrics;
+    try {
+      InputFile file = HadoopInputFile.fromPath(stat.getPath(), conf);
+      long rowCount = Avro.rowCount(file);
+      metrics = new Metrics(rowCount, null, null, null, null);
+    } catch (UncheckedIOException e) {
+      throw new RuntimeException("Unable to read the footer of the avro file: 
" +
+              stat.getPath(), e);
+    }
+
+    String partitionKey = spec.fields().stream()
+            .map(PartitionField::name)
+            .map(name -> String.format("%s=%s", name, partitionPath.get(name)))
+            .collect(Collectors.joining("/"));
+
+    return  DataFiles.builder(spec)
+            .withPath(stat.getPath().toString())
+            .withFormat("avro")
+            .withFileSizeInBytes(stat.getLen())
+            .withMetrics(metrics)
+            .withPartitionPath(partitionKey)
+            .build();
+  }
+
+  private static DataFile getParquetDataFile(FileStatus stat, Map<String, 
String> partitionPath,
                                                      PartitionSpec spec, 
Configuration conf,
                                                      MetricsConfig 
metricsSpec, NameMapping mapping) {
+    Metrics metrics;
     try {
-      Path partition = new Path(partitionUri);
-      FileSystem fs = partition.getFileSystem(conf);
-
-      return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
-          .filter(FileStatus::isFile)
-          .map(stat -> {
-            Metrics metrics;
-            try {
-              ParquetMetadata metadata = ParquetFileReader.readFooter(conf, 
stat);
-              metrics = ParquetUtil.footerMetrics(metadata, Stream.empty(), 
metricsSpec, mapping);
-            } catch (IOException e) {
-              throw new RuntimeException("Unable to read the footer of the 
parquet file: " +
-                  stat.getPath(), e);
-            }
-            String partitionKey = spec.fields().stream()
-                .map(PartitionField::name)
-                .map(name -> String.format("%s=%s", name, 
partitionPath.get(name)))
-                .collect(Collectors.joining("/"));
-
-            return DataFiles.builder(spec)
-                .withPath(stat.getPath().toString())
-                .withFormat("parquet")
-                .withFileSizeInBytes(stat.getLen())
-                .withMetrics(metrics)
-                .withPartitionPath(partitionKey)
-                .build();
-          }).collect(Collectors.toList());
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to list files in partition: " + 
partitionUri, e);
+      InputFile file = HadoopInputFile.fromPath(stat.getPath(), conf);
+      metrics = ParquetUtil.fileMetrics(file, metricsSpec, mapping);
+    } catch (UncheckedIOException e) {
+      throw new RuntimeException("Unable to read the footer of the parquet 
file: " +
+              stat.getPath(), e);
     }
+
+    String partitionKey = spec.fields().stream()

Review comment:
       Ok  ,  I also  pull out of the  `DataFiles.builder`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to