RussellSpitzer commented on a change in pull request #3876:
URL: https://github.com/apache/iceberg/pull/3876#discussion_r790094008
##########
File path: data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
##########
@@ -74,111 +77,104 @@ private TableMigrationUtil() {
public static List<DataFile> listPartition(Map<String, String> partition,
String uri, String format,
PartitionSpec spec, Configuration
conf, MetricsConfig metricsConfig,
NameMapping mapping) {
- if (format.contains("avro")) {
- return listAvroPartition(partition, uri, spec, conf);
- } else if (format.contains("parquet")) {
- return listParquetPartition(partition, uri, spec, conf, metricsConfig,
mapping);
- } else if (format.contains("orc")) {
- return listOrcPartition(partition, uri, spec, conf, metricsConfig,
mapping);
- } else {
- throw new UnsupportedOperationException("Unknown partition format: " +
format);
- }
+ return listPartition(partition, uri, format, spec, conf, metricsConfig,
mapping, 1);
}
- private static List<DataFile> listAvroPartition(Map<String, String>
partitionPath, String partitionUri,
- PartitionSpec spec,
Configuration conf) {
+ public static List<DataFile> listPartition(Map<String, String>
partitionPath, String partitionUri, String format,
+ PartitionSpec spec, Configuration
conf, MetricsConfig metricsSpec,
+ NameMapping mapping, int
parallelism) {
try {
+ String partitionKey = spec.fields().stream()
+ .map(PartitionField::name)
+ .map(name -> String.format("%s=%s", name,
partitionPath.get(name)))
+ .collect(Collectors.joining("/"));
Path partition = new Path(partitionUri);
FileSystem fs = partition.getFileSystem(conf);
- return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
- .filter(FileStatus::isFile)
- .map(stat -> {
- InputFile file =
HadoopInputFile.fromLocation(stat.getPath().toString(), conf);
- long rowCount = Avro.rowCount(file);
- Metrics metrics = new Metrics(rowCount, null, null, null, null);
- String partitionKey = spec.fields().stream()
- .map(PartitionField::name)
- .map(name -> String.format("%s=%s", name,
partitionPath.get(name)))
- .collect(Collectors.joining("/"));
-
- return DataFiles.builder(spec)
- .withPath(stat.getPath().toString())
- .withFormat("avro")
- .withFileSizeInBytes(stat.getLen())
- .withMetrics(metrics)
- .withPartitionPath(partitionKey)
- .build();
-
- }).collect(Collectors.toList());
+ List<FileStatus> fileStatus = Arrays.stream(fs.listStatus(partition,
HIDDEN_PATH_FILTER))
+ .filter(FileStatus::isFile)
+ .collect(Collectors.toList());
+ DataFile[] datafiles = new DataFile[fileStatus.size()];
+ Tasks.Builder<Integer> task = Tasks.range(fileStatus.size())
+ .stopOnFailure()
+ .throwFailureWhenFinished();
+
+ if (parallelism > 1) {
+ task.executeWith(migrationService(parallelism));
+ }
+ if (format.contains("avro")) {
+ task.run(index -> {
+ Metrics metrics = getAvroMerics(fileStatus.get(index), conf);
+ datafiles[index] = buildDataFile(fileStatus.get(index),
partitionKey, spec, metrics, "avro");
Review comment:
I'm not sure why we do a "contains" instead of "equals" in the format
check here ... so it may be right to force through "avro" instead of reusing
format ... Someone maybe had a custom parquet format or something ...
Let's keep this as is
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]