vinishjail97 commented on code in PR #815:
URL: https://github.com/apache/incubator-xtable/pull/815#discussion_r2897162155
##########
xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java:
##########
@@ -104,7 +104,11 @@ public Stream<InternalDataFile> addStatsToFiles(
private Stream<InternalDataFile> computeColumnStatsFromParquetFooters(
Stream<InternalDataFile> files, Map<String, InternalField> nameFieldMap)
{
- return files.map(
+ // Use the common ForkJoinPool for parquet footer reads; parallelism can
be tuned via
+ // -Djava.util.concurrent.ForkJoinPool.common.parallelism.
+ return files
+ .parallel()
Review Comment:
`Stream.parallel()` uses the common `ForkJoinPool`, which is shared across
the entire JVM. Since `readRangeFromParquetMetadata` is a blocking I/O call
(especially on S3), this can starve other parallel streams and
`CompletableFuture.supplyAsync` calls that share the same pool. `ForkJoinPool`
is designed for CPU-bound divide-and-conquer work — for blocking I/O, a
dedicated `ExecutorService` (fixed thread pool) is the right tool. Consider
injecting an executor and using `CompletableFuture.supplyAsync`:
```java
List<CompletableFuture<InternalDataFile>> futures = files
.map(file -> CompletableFuture.supplyAsync(() -> {
HudiFileStats stats = computeColumnStatsForFile(new
Path(file.getPhysicalPath()), nameFieldMap);
return
file.toBuilder().columnStats(stats.getColumnStats()).recordCount(stats.getRowCount()).build();
}, footerReadExecutor))
.collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
return futures.stream().map(CompletableFuture::join);
```
The executor can be a constructor parameter (e.g.,
`Executors.newFixedThreadPool(n)`) so callers control threading.
Also — is `UTILS` (`ParquetUtils`, the static singleton on line 73)
thread-safe? `readRangeFromParquetMetadata` is now called concurrently from
multiple threads on the same instance. This should be verified before relying
on it here.
##########
xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java:
##########
@@ -199,6 +200,50 @@ void columnStatsWithoutMetadataTable(@TempDir Path
tempDir) throws IOException {
validateOutput(output);
}
+ @Test
+ void
columnStatsWithoutMetadataTable_parallelFooterReadsAreThreadSafe(@TempDir Path
tempDir)
+ throws IOException {
+ Path file = tempDir.resolve("tmp.parquet");
+ GenericData genericData = GenericData.get();
+ genericData.addLogicalTypeConversion(new Conversions.DecimalConversion());
+ try (ParquetWriter<GenericRecord> writer =
+ AvroParquetWriter.<GenericRecord>builder(
+ HadoopOutputFile.fromPath(
+ new org.apache.hadoop.fs.Path(file.toUri()),
configuration))
+ .withSchema(AVRO_SCHEMA)
+ .withDataModel(genericData)
+ .build()) {
+ for (GenericRecord record : getRecords()) {
+ writer.write(record);
+ }
+ }
+
+ HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
+ when(mockMetaClient.getHadoopConf()).thenReturn(configuration);
+ HudiFileStatsExtractor fileStatsExtractor = new
HudiFileStatsExtractor(mockMetaClient);
+
+ List<InternalDataFile> inputFiles =
+ IntStream.range(0, 200)
+ .mapToObj(
+ i ->
+ InternalDataFile.builder()
+ .physicalPath(file.toString())
+ .columnStats(Collections.emptyList())
+ .fileFormat(FileFormat.APACHE_PARQUET)
+ .lastModified(1234L)
+ .fileSizeBytes(4321L)
+ .recordCount(0)
+ .build())
+ .collect(Collectors.toList());
+
+ List<InternalDataFile> output =
+ fileStatsExtractor.addStatsToFiles(null, inputFiles.stream(),
schema).collect(Collectors.toList());
Review Comment:
nit: Line exceeds style limit. Split:
```java
List<InternalDataFile> output =
fileStatsExtractor
.addStatsToFiles(null, inputFiles.stream(), schema)
.collect(Collectors.toList());
```
##########
xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java:
##########
@@ -199,6 +200,50 @@ void columnStatsWithoutMetadataTable(@TempDir Path
tempDir) throws IOException {
validateOutput(output);
}
+ @Test
+ void
columnStatsWithoutMetadataTable_parallelFooterReadsAreThreadSafe(@TempDir Path
tempDir)
+ throws IOException {
+ Path file = tempDir.resolve("tmp.parquet");
+ GenericData genericData = GenericData.get();
+ genericData.addLogicalTypeConversion(new Conversions.DecimalConversion());
+ try (ParquetWriter<GenericRecord> writer =
+ AvroParquetWriter.<GenericRecord>builder(
+ HadoopOutputFile.fromPath(
+ new org.apache.hadoop.fs.Path(file.toUri()),
configuration))
+ .withSchema(AVRO_SCHEMA)
+ .withDataModel(genericData)
+ .build()) {
+ for (GenericRecord record : getRecords()) {
+ writer.write(record);
+ }
+ }
+
+ HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
+ when(mockMetaClient.getHadoopConf()).thenReturn(configuration);
+ HudiFileStatsExtractor fileStatsExtractor = new
HudiFileStatsExtractor(mockMetaClient);
+
+ List<InternalDataFile> inputFiles =
+ IntStream.range(0, 200)
+ .mapToObj(
+ i ->
+ InternalDataFile.builder()
+ .physicalPath(file.toString())
+ .columnStats(Collections.emptyList())
+ .fileFormat(FileFormat.APACHE_PARQUET)
+ .lastModified(1234L)
+ .fileSizeBytes(4321L)
+ .recordCount(0)
+ .build())
+ .collect(Collectors.toList());
+
+ List<InternalDataFile> output =
+ fileStatsExtractor.addStatsToFiles(null, inputFiles.stream(),
schema).collect(Collectors.toList());
+
+ assertEquals(200, output.size());
+ assertTrue(output.stream().allMatch(fileWithStats ->
fileWithStats.getRecordCount() == 2));
Review Comment:
The parallel test only checks `getRecordCount() == 2` and
`getColumnStats().size() == 9` — counts only, not actual values. Thread-safety
bugs (e.g., corrupted ranges from shared state) could produce 9 stats with
wrong min/max/null counts and this test wouldn't catch it. Consider asserting
actual stat values on at least one output file. The existing `validateOutput`
method does this thoroughly but currently requires `output.size() == 1` —
either extract the value assertions into a separate helper or call it on
`output.subList(0, 1)` if `validateOutput` is refactored to take a single
`InternalDataFile`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]