vinishjail97 commented on code in PR #815:
URL: https://github.com/apache/incubator-xtable/pull/815#discussion_r2897162155


##########
xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java:
##########
@@ -104,7 +104,11 @@ public Stream<InternalDataFile> addStatsToFiles(
 
   private Stream<InternalDataFile> computeColumnStatsFromParquetFooters(
       Stream<InternalDataFile> files, Map<String, InternalField> nameFieldMap) 
{
-    return files.map(
+    // Use the common ForkJoinPool for parquet footer reads; parallelism can 
be tuned via
+    // -Djava.util.concurrent.ForkJoinPool.common.parallelism.
+    return files
+        .parallel()

Review Comment:
   `Stream.parallel()` uses the common `ForkJoinPool`, which is shared across 
the entire JVM. Since `readRangeFromParquetMetadata` is a blocking I/O call 
(especially on S3), this can starve other parallel streams and 
`CompletableFuture.supplyAsync` calls that share the same pool. `ForkJoinPool` 
is designed for CPU-bound divide-and-conquer work — for blocking I/O, a 
dedicated `ExecutorService` (fixed thread pool) is the right tool. Consider 
injecting an executor and using `CompletableFuture.supplyAsync`:
   ```java
   List<CompletableFuture<InternalDataFile>> futures = files
       .map(file -> CompletableFuture.supplyAsync(() -> {
           HudiFileStats stats = computeColumnStatsForFile(new 
Path(file.getPhysicalPath()), nameFieldMap);
           return 
file.toBuilder().columnStats(stats.getColumnStats()).recordCount(stats.getRowCount()).build();
       }, footerReadExecutor))
       .collect(Collectors.toList());
   CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
   return futures.stream().map(CompletableFuture::join);
   ```
   The executor can be a constructor parameter (e.g., 
`Executors.newFixedThreadPool(n)`) so callers control threading.
   
   Also — is `UTILS` (`ParquetUtils`, the static singleton on line 73) 
thread-safe? `readRangeFromParquetMetadata` is now called concurrently from 
multiple threads on the same instance. This should be verified before relying 
on it here.



##########
xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java:
##########
@@ -199,6 +200,50 @@ void columnStatsWithoutMetadataTable(@TempDir Path 
tempDir) throws IOException {
     validateOutput(output);
   }
 
+  @Test
+  void 
columnStatsWithoutMetadataTable_parallelFooterReadsAreThreadSafe(@TempDir Path 
tempDir)
+      throws IOException {
+    Path file = tempDir.resolve("tmp.parquet");
+    GenericData genericData = GenericData.get();
+    genericData.addLogicalTypeConversion(new Conversions.DecimalConversion());
+    try (ParquetWriter<GenericRecord> writer =
+        AvroParquetWriter.<GenericRecord>builder(
+                HadoopOutputFile.fromPath(
+                    new org.apache.hadoop.fs.Path(file.toUri()), 
configuration))
+            .withSchema(AVRO_SCHEMA)
+            .withDataModel(genericData)
+            .build()) {
+      for (GenericRecord record : getRecords()) {
+        writer.write(record);
+      }
+    }
+
+    HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
+    when(mockMetaClient.getHadoopConf()).thenReturn(configuration);
+    HudiFileStatsExtractor fileStatsExtractor = new 
HudiFileStatsExtractor(mockMetaClient);
+
+    List<InternalDataFile> inputFiles =
+        IntStream.range(0, 200)
+            .mapToObj(
+                i ->
+                    InternalDataFile.builder()
+                        .physicalPath(file.toString())
+                        .columnStats(Collections.emptyList())
+                        .fileFormat(FileFormat.APACHE_PARQUET)
+                        .lastModified(1234L)
+                        .fileSizeBytes(4321L)
+                        .recordCount(0)
+                        .build())
+            .collect(Collectors.toList());
+
+    List<InternalDataFile> output =
+        fileStatsExtractor.addStatsToFiles(null, inputFiles.stream(), 
schema).collect(Collectors.toList());

Review Comment:
   nit: Line exceeds style limit. Split:
   ```java
   List<InternalDataFile> output =
       fileStatsExtractor
           .addStatsToFiles(null, inputFiles.stream(), schema)
           .collect(Collectors.toList());
   ```



##########
xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java:
##########
@@ -199,6 +200,50 @@ void columnStatsWithoutMetadataTable(@TempDir Path 
tempDir) throws IOException {
     validateOutput(output);
   }
 
+  @Test
+  void 
columnStatsWithoutMetadataTable_parallelFooterReadsAreThreadSafe(@TempDir Path 
tempDir)
+      throws IOException {
+    Path file = tempDir.resolve("tmp.parquet");
+    GenericData genericData = GenericData.get();
+    genericData.addLogicalTypeConversion(new Conversions.DecimalConversion());
+    try (ParquetWriter<GenericRecord> writer =
+        AvroParquetWriter.<GenericRecord>builder(
+                HadoopOutputFile.fromPath(
+                    new org.apache.hadoop.fs.Path(file.toUri()), 
configuration))
+            .withSchema(AVRO_SCHEMA)
+            .withDataModel(genericData)
+            .build()) {
+      for (GenericRecord record : getRecords()) {
+        writer.write(record);
+      }
+    }
+
+    HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
+    when(mockMetaClient.getHadoopConf()).thenReturn(configuration);
+    HudiFileStatsExtractor fileStatsExtractor = new 
HudiFileStatsExtractor(mockMetaClient);
+
+    List<InternalDataFile> inputFiles =
+        IntStream.range(0, 200)
+            .mapToObj(
+                i ->
+                    InternalDataFile.builder()
+                        .physicalPath(file.toString())
+                        .columnStats(Collections.emptyList())
+                        .fileFormat(FileFormat.APACHE_PARQUET)
+                        .lastModified(1234L)
+                        .fileSizeBytes(4321L)
+                        .recordCount(0)
+                        .build())
+            .collect(Collectors.toList());
+
+    List<InternalDataFile> output =
+        fileStatsExtractor.addStatsToFiles(null, inputFiles.stream(), 
schema).collect(Collectors.toList());
+
+    assertEquals(200, output.size());
+    assertTrue(output.stream().allMatch(fileWithStats -> 
fileWithStats.getRecordCount() == 2));

Review Comment:
   The parallel test only checks `getRecordCount() == 2` and 
`getColumnStats().size() == 9` — counts only, not actual values. Thread-safety 
bugs (e.g., corrupted ranges from shared state) could produce 9 stats with 
wrong min/max/null counts and this test wouldn't catch it. Consider asserting 
actual stat values on at least one output file. The existing `validateOutput` 
method does this thoroughly but currently requires `output.size() == 1` — 
either extract the value assertions into a separate helper or call it on 
`output.subList(0, 1)` if `validateOutput` is refactored to take a single 
`InternalDataFile`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to