RussellSpitzer commented on a change in pull request #3876:
URL: https://github.com/apache/iceberg/pull/3876#discussion_r805971174
##########
File path: data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
##########
@@ -74,111 +77,106 @@ private TableMigrationUtil() {
public static List<DataFile> listPartition(Map<String, String> partition,
String uri, String format,
PartitionSpec spec, Configuration
conf, MetricsConfig metricsConfig,
NameMapping mapping) {
- if (format.contains("avro")) {
- return listAvroPartition(partition, uri, spec, conf);
- } else if (format.contains("parquet")) {
- return listParquetPartition(partition, uri, spec, conf, metricsConfig,
mapping);
- } else if (format.contains("orc")) {
- return listOrcPartition(partition, uri, spec, conf, metricsConfig,
mapping);
- } else {
- throw new UnsupportedOperationException("Unknown partition format: " +
format);
- }
+ return listPartition(partition, uri, format, spec, conf, metricsConfig,
mapping, 1);
}
- private static List<DataFile> listAvroPartition(Map<String, String>
partitionPath, String partitionUri,
- PartitionSpec spec,
Configuration conf) {
+ public static List<DataFile> listPartition(Map<String, String>
partitionPath, String partitionUri, String format,
+ PartitionSpec spec, Configuration
conf, MetricsConfig metricsSpec,
+ NameMapping mapping, int
parallelism) {
try {
+ String partitionKey = spec.fields().stream()
+ .map(PartitionField::name)
+ .map(name -> String.format("%s=%s", name,
partitionPath.get(name)))
+ .collect(Collectors.joining("/"));
+
Path partition = new Path(partitionUri);
FileSystem fs = partition.getFileSystem(conf);
- return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
- .filter(FileStatus::isFile)
- .map(stat -> {
- InputFile file =
HadoopInputFile.fromLocation(stat.getPath().toString(), conf);
- long rowCount = Avro.rowCount(file);
- Metrics metrics = new Metrics(rowCount, null, null, null, null);
- String partitionKey = spec.fields().stream()
- .map(PartitionField::name)
- .map(name -> String.format("%s=%s", name,
partitionPath.get(name)))
- .collect(Collectors.joining("/"));
-
- return DataFiles.builder(spec)
- .withPath(stat.getPath().toString())
- .withFormat("avro")
- .withFileSizeInBytes(stat.getLen())
- .withMetrics(metrics)
- .withPartitionPath(partitionKey)
- .build();
-
- }).collect(Collectors.toList());
+ List<FileStatus> fileStatus = Arrays.stream(fs.listStatus(partition,
HIDDEN_PATH_FILTER))
+ .filter(FileStatus::isFile)
+ .collect(Collectors.toList());
+ DataFile[] datafiles = new DataFile[fileStatus.size()];
+ Tasks.Builder<Integer> task = Tasks.range(fileStatus.size())
+ .stopOnFailure()
+ .throwFailureWhenFinished();
+
+ if (parallelism > 1) {
+ task.executeWith(migrationService(parallelism));
+ }
+
+ if (format.contains("avro")) {
+ task.run(index -> {
+ Metrics metrics = getAvroMerics(fileStatus.get(index).getPath(),
conf);
+ datafiles[index] = buildDataFile(fileStatus.get(index),
partitionKey, spec, metrics, "avro");
+ });
+ } else if (format.contains("parquet")) {
+ task.run(index -> {
+ Metrics metrics = getParquetMerics(fileStatus.get(index).getPath(),
conf, metricsSpec, mapping);
+ datafiles[index] = buildDataFile(fileStatus.get(index),
partitionKey, spec, metrics, "parquet");
+ });
+ } else if (format.contains("orc")) {
+ task.run(index -> {
+ Metrics metrics = getOrcMerics(fileStatus.get(index).getPath(),
conf, metricsSpec, mapping);
+ datafiles[index] = buildDataFile(fileStatus.get(index),
partitionKey, spec, metrics, "orc");
+ });
+ } else {
+ throw new UnsupportedOperationException("Unknown partition format: " +
format);
+ }
+ return Arrays.asList(datafiles);
} catch (IOException e) {
throw new RuntimeException("Unable to list files in partition: " +
partitionUri, e);
}
}
- private static List<DataFile> listParquetPartition(Map<String, String>
partitionPath, String partitionUri,
- PartitionSpec spec,
Configuration conf,
- MetricsConfig
metricsSpec, NameMapping mapping) {
+ private static Metrics getAvroMerics(Path path, Configuration conf) {
try {
- Path partition = new Path(partitionUri);
- FileSystem fs = partition.getFileSystem(conf);
-
- return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
- .filter(FileStatus::isFile)
- .map(stat -> {
- Metrics metrics;
- try {
- ParquetMetadata metadata = ParquetFileReader.readFooter(conf,
stat);
- metrics = ParquetUtil.footerMetrics(metadata, Stream.empty(),
metricsSpec, mapping);
- } catch (IOException e) {
- throw new RuntimeException("Unable to read the footer of the
parquet file: " +
- stat.getPath(), e);
- }
- String partitionKey = spec.fields().stream()
- .map(PartitionField::name)
- .map(name -> String.format("%s=%s", name,
partitionPath.get(name)))
- .collect(Collectors.joining("/"));
-
- return DataFiles.builder(spec)
- .withPath(stat.getPath().toString())
- .withFormat("parquet")
- .withFileSizeInBytes(stat.getLen())
- .withMetrics(metrics)
- .withPartitionPath(partitionKey)
- .build();
- }).collect(Collectors.toList());
- } catch (IOException e) {
- throw new RuntimeException("Unable to list files in partition: " +
partitionUri, e);
+ InputFile file = HadoopInputFile.fromPath(path, conf);
+ long rowCount = Avro.rowCount(file);
+ return new Metrics(rowCount, null, null, null, null);
+ } catch (UncheckedIOException e) {
+ throw new RuntimeException("Unable to read Avro file: " +
+ path, e);
}
}
- private static List<DataFile> listOrcPartition(Map<String, String>
partitionPath, String partitionUri,
- PartitionSpec spec,
Configuration conf,
- MetricsConfig metricsSpec,
NameMapping mapping) {
+ private static Metrics getParquetMerics(Path path, Configuration conf,
+ MetricsConfig metricsSpec,
NameMapping mapping) {
try {
- Path partition = new Path(partitionUri);
- FileSystem fs = partition.getFileSystem(conf);
+ InputFile file = HadoopInputFile.fromPath(path, conf);
+ return ParquetUtil.fileMetrics(file, metricsSpec, mapping);
+ } catch (UncheckedIOException e) {
+ throw new RuntimeException("Unable to read the metrics of the Parquet
file: " +
+ path, e);
+ }
+ }
- return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
- .filter(FileStatus::isFile)
- .map(stat -> {
- Metrics metrics =
OrcMetrics.fromInputFile(HadoopInputFile.fromPath(stat.getPath(), conf),
- metricsSpec, mapping);
- String partitionKey = spec.fields().stream()
- .map(PartitionField::name)
- .map(name -> String.format("%s=%s", name,
partitionPath.get(name)))
- .collect(Collectors.joining("/"));
+ private static Metrics getOrcMerics(Path path, Configuration conf,
+ MetricsConfig metricsSpec,
NameMapping mapping) {
+ try {
+ return OrcMetrics.fromInputFile(HadoopInputFile.fromPath(path, conf),
+ metricsSpec, mapping);
+ } catch (UncheckedIOException e) {
+ throw new RuntimeException("Unable to read the metrics of the Orc file:
" +
+ path, e);
+ }
+ }
- return DataFiles.builder(spec)
- .withPath(stat.getPath().toString())
- .withFormat("orc")
- .withFileSizeInBytes(stat.getLen())
- .withMetrics(metrics)
- .withPartitionPath(partitionKey)
- .build();
+ private static DataFile buildDataFile(FileStatus stat, String partitionKey,
+ PartitionSpec spec, Metrics metrics,
String format) {
+ return DataFiles.builder(spec)
+ .withPath(stat.getPath().toString())
+ .withFormat(format)
+ .withFileSizeInBytes(stat.getLen())
+ .withMetrics(metrics)
+ .withPartitionPath(partitionKey)
+ .build();
+ }
- }).collect(Collectors.toList());
- } catch (IOException e) {
- throw new RuntimeException("Unable to list files in partition: " +
partitionUri, e);
- }
+ private static ExecutorService migrationService(int concurrentDeletes) {
Review comment:
Looking at this I think we should push this parameter through as an
executor service the whole way so actions can reuse a Service (like in
ExpireSnapshots or RemoveOrphans). I think i'll do one PR for the other
procedures then work with @kingeasternsun to modify
https://github.com/apache/iceberg/pull/3973/files to fit.
My basic thought process is
Actions with parallelism get a wtihExecutorService() method
Procedures who use that follow a
Create Service
Execute Action
Close Service pattern.
So we extract the service out of the utility class and into the procedure
itself
##########
File path: data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
##########
@@ -74,111 +77,106 @@ private TableMigrationUtil() {
public static List<DataFile> listPartition(Map<String, String> partition,
String uri, String format,
PartitionSpec spec, Configuration
conf, MetricsConfig metricsConfig,
NameMapping mapping) {
- if (format.contains("avro")) {
- return listAvroPartition(partition, uri, spec, conf);
- } else if (format.contains("parquet")) {
- return listParquetPartition(partition, uri, spec, conf, metricsConfig,
mapping);
- } else if (format.contains("orc")) {
- return listOrcPartition(partition, uri, spec, conf, metricsConfig,
mapping);
- } else {
- throw new UnsupportedOperationException("Unknown partition format: " +
format);
- }
+ return listPartition(partition, uri, format, spec, conf, metricsConfig,
mapping, 1);
}
- private static List<DataFile> listAvroPartition(Map<String, String>
partitionPath, String partitionUri,
- PartitionSpec spec,
Configuration conf) {
+ public static List<DataFile> listPartition(Map<String, String>
partitionPath, String partitionUri, String format,
+ PartitionSpec spec, Configuration
conf, MetricsConfig metricsSpec,
+ NameMapping mapping, int
parallelism) {
try {
+ String partitionKey = spec.fields().stream()
+ .map(PartitionField::name)
+ .map(name -> String.format("%s=%s", name,
partitionPath.get(name)))
+ .collect(Collectors.joining("/"));
+
Path partition = new Path(partitionUri);
FileSystem fs = partition.getFileSystem(conf);
- return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
- .filter(FileStatus::isFile)
- .map(stat -> {
- InputFile file =
HadoopInputFile.fromLocation(stat.getPath().toString(), conf);
- long rowCount = Avro.rowCount(file);
- Metrics metrics = new Metrics(rowCount, null, null, null, null);
- String partitionKey = spec.fields().stream()
- .map(PartitionField::name)
- .map(name -> String.format("%s=%s", name,
partitionPath.get(name)))
- .collect(Collectors.joining("/"));
-
- return DataFiles.builder(spec)
- .withPath(stat.getPath().toString())
- .withFormat("avro")
- .withFileSizeInBytes(stat.getLen())
- .withMetrics(metrics)
- .withPartitionPath(partitionKey)
- .build();
-
- }).collect(Collectors.toList());
+ List<FileStatus> fileStatus = Arrays.stream(fs.listStatus(partition,
HIDDEN_PATH_FILTER))
+ .filter(FileStatus::isFile)
+ .collect(Collectors.toList());
+ DataFile[] datafiles = new DataFile[fileStatus.size()];
+ Tasks.Builder<Integer> task = Tasks.range(fileStatus.size())
+ .stopOnFailure()
+ .throwFailureWhenFinished();
+
+ if (parallelism > 1) {
+ task.executeWith(migrationService(parallelism));
+ }
+
+ if (format.contains("avro")) {
+ task.run(index -> {
+ Metrics metrics = getAvroMerics(fileStatus.get(index).getPath(),
conf);
+ datafiles[index] = buildDataFile(fileStatus.get(index),
partitionKey, spec, metrics, "avro");
+ });
+ } else if (format.contains("parquet")) {
+ task.run(index -> {
+ Metrics metrics = getParquetMerics(fileStatus.get(index).getPath(),
conf, metricsSpec, mapping);
+ datafiles[index] = buildDataFile(fileStatus.get(index),
partitionKey, spec, metrics, "parquet");
+ });
+ } else if (format.contains("orc")) {
+ task.run(index -> {
+ Metrics metrics = getOrcMerics(fileStatus.get(index).getPath(),
conf, metricsSpec, mapping);
+ datafiles[index] = buildDataFile(fileStatus.get(index),
partitionKey, spec, metrics, "orc");
+ });
+ } else {
+ throw new UnsupportedOperationException("Unknown partition format: " +
format);
+ }
+ return Arrays.asList(datafiles);
} catch (IOException e) {
throw new RuntimeException("Unable to list files in partition: " +
partitionUri, e);
}
}
- private static List<DataFile> listParquetPartition(Map<String, String>
partitionPath, String partitionUri,
- PartitionSpec spec,
Configuration conf,
- MetricsConfig
metricsSpec, NameMapping mapping) {
+ private static Metrics getAvroMerics(Path path, Configuration conf) {
try {
- Path partition = new Path(partitionUri);
- FileSystem fs = partition.getFileSystem(conf);
-
- return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
- .filter(FileStatus::isFile)
- .map(stat -> {
- Metrics metrics;
- try {
- ParquetMetadata metadata = ParquetFileReader.readFooter(conf,
stat);
- metrics = ParquetUtil.footerMetrics(metadata, Stream.empty(),
metricsSpec, mapping);
- } catch (IOException e) {
- throw new RuntimeException("Unable to read the footer of the
parquet file: " +
- stat.getPath(), e);
- }
- String partitionKey = spec.fields().stream()
- .map(PartitionField::name)
- .map(name -> String.format("%s=%s", name,
partitionPath.get(name)))
- .collect(Collectors.joining("/"));
-
- return DataFiles.builder(spec)
- .withPath(stat.getPath().toString())
- .withFormat("parquet")
- .withFileSizeInBytes(stat.getLen())
- .withMetrics(metrics)
- .withPartitionPath(partitionKey)
- .build();
- }).collect(Collectors.toList());
- } catch (IOException e) {
- throw new RuntimeException("Unable to list files in partition: " +
partitionUri, e);
+ InputFile file = HadoopInputFile.fromPath(path, conf);
+ long rowCount = Avro.rowCount(file);
+ return new Metrics(rowCount, null, null, null, null);
+ } catch (UncheckedIOException e) {
+ throw new RuntimeException("Unable to read Avro file: " +
+ path, e);
}
}
- private static List<DataFile> listOrcPartition(Map<String, String>
partitionPath, String partitionUri,
- PartitionSpec spec,
Configuration conf,
- MetricsConfig metricsSpec,
NameMapping mapping) {
+ private static Metrics getParquetMerics(Path path, Configuration conf,
+ MetricsConfig metricsSpec,
NameMapping mapping) {
try {
- Path partition = new Path(partitionUri);
- FileSystem fs = partition.getFileSystem(conf);
+ InputFile file = HadoopInputFile.fromPath(path, conf);
+ return ParquetUtil.fileMetrics(file, metricsSpec, mapping);
+ } catch (UncheckedIOException e) {
+ throw new RuntimeException("Unable to read the metrics of the Parquet
file: " +
+ path, e);
+ }
+ }
- return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
- .filter(FileStatus::isFile)
- .map(stat -> {
- Metrics metrics =
OrcMetrics.fromInputFile(HadoopInputFile.fromPath(stat.getPath(), conf),
- metricsSpec, mapping);
- String partitionKey = spec.fields().stream()
- .map(PartitionField::name)
- .map(name -> String.format("%s=%s", name,
partitionPath.get(name)))
- .collect(Collectors.joining("/"));
+ private static Metrics getOrcMerics(Path path, Configuration conf,
+ MetricsConfig metricsSpec,
NameMapping mapping) {
+ try {
+ return OrcMetrics.fromInputFile(HadoopInputFile.fromPath(path, conf),
+ metricsSpec, mapping);
+ } catch (UncheckedIOException e) {
+ throw new RuntimeException("Unable to read the metrics of the Orc file:
" +
+ path, e);
+ }
+ }
- return DataFiles.builder(spec)
- .withPath(stat.getPath().toString())
- .withFormat("orc")
- .withFileSizeInBytes(stat.getLen())
- .withMetrics(metrics)
- .withPartitionPath(partitionKey)
- .build();
+ private static DataFile buildDataFile(FileStatus stat, String partitionKey,
+ PartitionSpec spec, Metrics metrics,
String format) {
+ return DataFiles.builder(spec)
+ .withPath(stat.getPath().toString())
+ .withFormat(format)
+ .withFileSizeInBytes(stat.getLen())
+ .withMetrics(metrics)
+ .withPartitionPath(partitionKey)
+ .build();
+ }
- }).collect(Collectors.toList());
- } catch (IOException e) {
- throw new RuntimeException("Unable to list files in partition: " +
partitionUri, e);
- }
+ private static ExecutorService migrationService(int concurrentDeletes) {
Review comment:
I was thwarted in my plans for this particular method, the issue is we
call this on the executors and not on the driver so we can't pass through an
executor service, at least not in a sensible way so I think we just need to
close it in the utility method ... not ideal
##########
File path: data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
##########
@@ -74,111 +77,106 @@ private TableMigrationUtil() {
public static List<DataFile> listPartition(Map<String, String> partition,
String uri, String format,
PartitionSpec spec, Configuration
conf, MetricsConfig metricsConfig,
NameMapping mapping) {
- if (format.contains("avro")) {
- return listAvroPartition(partition, uri, spec, conf);
- } else if (format.contains("parquet")) {
- return listParquetPartition(partition, uri, spec, conf, metricsConfig,
mapping);
- } else if (format.contains("orc")) {
- return listOrcPartition(partition, uri, spec, conf, metricsConfig,
mapping);
- } else {
- throw new UnsupportedOperationException("Unknown partition format: " +
format);
- }
+ return listPartition(partition, uri, format, spec, conf, metricsConfig,
mapping, 1);
}
- private static List<DataFile> listAvroPartition(Map<String, String>
partitionPath, String partitionUri,
- PartitionSpec spec,
Configuration conf) {
+ public static List<DataFile> listPartition(Map<String, String>
partitionPath, String partitionUri, String format,
+ PartitionSpec spec, Configuration
conf, MetricsConfig metricsSpec,
+ NameMapping mapping, int
parallelism) {
try {
+ String partitionKey = spec.fields().stream()
+ .map(PartitionField::name)
+ .map(name -> String.format("%s=%s", name,
partitionPath.get(name)))
+ .collect(Collectors.joining("/"));
+
Path partition = new Path(partitionUri);
FileSystem fs = partition.getFileSystem(conf);
- return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
- .filter(FileStatus::isFile)
- .map(stat -> {
- InputFile file =
HadoopInputFile.fromLocation(stat.getPath().toString(), conf);
- long rowCount = Avro.rowCount(file);
- Metrics metrics = new Metrics(rowCount, null, null, null, null);
- String partitionKey = spec.fields().stream()
- .map(PartitionField::name)
- .map(name -> String.format("%s=%s", name,
partitionPath.get(name)))
- .collect(Collectors.joining("/"));
-
- return DataFiles.builder(spec)
- .withPath(stat.getPath().toString())
- .withFormat("avro")
- .withFileSizeInBytes(stat.getLen())
- .withMetrics(metrics)
- .withPartitionPath(partitionKey)
- .build();
-
- }).collect(Collectors.toList());
+ List<FileStatus> fileStatus = Arrays.stream(fs.listStatus(partition,
HIDDEN_PATH_FILTER))
+ .filter(FileStatus::isFile)
+ .collect(Collectors.toList());
+ DataFile[] datafiles = new DataFile[fileStatus.size()];
+ Tasks.Builder<Integer> task = Tasks.range(fileStatus.size())
+ .stopOnFailure()
+ .throwFailureWhenFinished();
+
+ if (parallelism > 1) {
+ task.executeWith(migrationService(parallelism));
+ }
+
+ if (format.contains("avro")) {
+ task.run(index -> {
+ Metrics metrics = getAvroMerics(fileStatus.get(index).getPath(),
conf);
+ datafiles[index] = buildDataFile(fileStatus.get(index),
partitionKey, spec, metrics, "avro");
+ });
+ } else if (format.contains("parquet")) {
+ task.run(index -> {
+ Metrics metrics = getParquetMerics(fileStatus.get(index).getPath(),
conf, metricsSpec, mapping);
+ datafiles[index] = buildDataFile(fileStatus.get(index),
partitionKey, spec, metrics, "parquet");
+ });
+ } else if (format.contains("orc")) {
+ task.run(index -> {
+ Metrics metrics = getOrcMerics(fileStatus.get(index).getPath(),
conf, metricsSpec, mapping);
+ datafiles[index] = buildDataFile(fileStatus.get(index),
partitionKey, spec, metrics, "orc");
+ });
+ } else {
+ throw new UnsupportedOperationException("Unknown partition format: " +
format);
+ }
+ return Arrays.asList(datafiles);
} catch (IOException e) {
throw new RuntimeException("Unable to list files in partition: " +
partitionUri, e);
}
}
- private static List<DataFile> listParquetPartition(Map<String, String>
partitionPath, String partitionUri,
- PartitionSpec spec,
Configuration conf,
- MetricsConfig
metricsSpec, NameMapping mapping) {
+ private static Metrics getAvroMerics(Path path, Configuration conf) {
try {
- Path partition = new Path(partitionUri);
- FileSystem fs = partition.getFileSystem(conf);
-
- return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
- .filter(FileStatus::isFile)
- .map(stat -> {
- Metrics metrics;
- try {
- ParquetMetadata metadata = ParquetFileReader.readFooter(conf,
stat);
- metrics = ParquetUtil.footerMetrics(metadata, Stream.empty(),
metricsSpec, mapping);
- } catch (IOException e) {
- throw new RuntimeException("Unable to read the footer of the
parquet file: " +
- stat.getPath(), e);
- }
- String partitionKey = spec.fields().stream()
- .map(PartitionField::name)
- .map(name -> String.format("%s=%s", name,
partitionPath.get(name)))
- .collect(Collectors.joining("/"));
-
- return DataFiles.builder(spec)
- .withPath(stat.getPath().toString())
- .withFormat("parquet")
- .withFileSizeInBytes(stat.getLen())
- .withMetrics(metrics)
- .withPartitionPath(partitionKey)
- .build();
- }).collect(Collectors.toList());
- } catch (IOException e) {
- throw new RuntimeException("Unable to list files in partition: " +
partitionUri, e);
+ InputFile file = HadoopInputFile.fromPath(path, conf);
+ long rowCount = Avro.rowCount(file);
+ return new Metrics(rowCount, null, null, null, null);
+ } catch (UncheckedIOException e) {
+ throw new RuntimeException("Unable to read Avro file: " +
+ path, e);
}
}
- private static List<DataFile> listOrcPartition(Map<String, String>
partitionPath, String partitionUri,
- PartitionSpec spec,
Configuration conf,
- MetricsConfig metricsSpec,
NameMapping mapping) {
+ private static Metrics getParquetMerics(Path path, Configuration conf,
+ MetricsConfig metricsSpec,
NameMapping mapping) {
try {
- Path partition = new Path(partitionUri);
- FileSystem fs = partition.getFileSystem(conf);
+ InputFile file = HadoopInputFile.fromPath(path, conf);
+ return ParquetUtil.fileMetrics(file, metricsSpec, mapping);
+ } catch (UncheckedIOException e) {
+ throw new RuntimeException("Unable to read the metrics of the Parquet
file: " +
+ path, e);
+ }
+ }
- return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
- .filter(FileStatus::isFile)
- .map(stat -> {
- Metrics metrics =
OrcMetrics.fromInputFile(HadoopInputFile.fromPath(stat.getPath(), conf),
- metricsSpec, mapping);
- String partitionKey = spec.fields().stream()
- .map(PartitionField::name)
- .map(name -> String.format("%s=%s", name,
partitionPath.get(name)))
- .collect(Collectors.joining("/"));
+ private static Metrics getOrcMerics(Path path, Configuration conf,
+ MetricsConfig metricsSpec,
NameMapping mapping) {
+ try {
+ return OrcMetrics.fromInputFile(HadoopInputFile.fromPath(path, conf),
+ metricsSpec, mapping);
+ } catch (UncheckedIOException e) {
+ throw new RuntimeException("Unable to read the metrics of the Orc file:
" +
+ path, e);
+ }
+ }
- return DataFiles.builder(spec)
- .withPath(stat.getPath().toString())
- .withFormat("orc")
- .withFileSizeInBytes(stat.getLen())
- .withMetrics(metrics)
- .withPartitionPath(partitionKey)
- .build();
+ private static DataFile buildDataFile(FileStatus stat, String partitionKey,
+ PartitionSpec spec, Metrics metrics,
String format) {
+ return DataFiles.builder(spec)
+ .withPath(stat.getPath().toString())
+ .withFormat(format)
+ .withFileSizeInBytes(stat.getLen())
+ .withMetrics(metrics)
+ .withPartitionPath(partitionKey)
+ .build();
+ }
- }).collect(Collectors.toList());
- } catch (IOException e) {
- throw new RuntimeException("Unable to list files in partition: " +
partitionUri, e);
- }
+ private static ExecutorService migrationService(int concurrentDeletes) {
Review comment:
#4125
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]