vinishjail97 commented on code in PR #805:
URL: https://github.com/apache/incubator-xtable/pull/805#discussion_r2839191813


##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java:
##########
@@ -79,78 +73,70 @@ private static Optional<Long> 
getMaxFromColumnStats(List<ColumnStat> columnStats
         .max(Long::compareTo);
   }
 
-  public static Map<ColumnDescriptor, List<ColumnStat>> 
getStatsForFile(ParquetMetadata footer) {
-    Map<ColumnDescriptor, List<ColumnStat>> columnDescStats = new HashMap<>();
+  @SuppressWarnings("unchecked")
+  private static final Comparator<Object> COMPARABLE_COMPARATOR =
+      (a, b) -> ((Comparable<Object>) a).compareTo(b);
+
+  private static ColumnStat mergeColumnChunks(
+      List<ColumnChunkMetaData> chunks, InternalSchema internalSchema) {
+    ColumnChunkMetaData first = chunks.get(0);
+    InternalField internalField =
+        SchemaFieldFinder.getInstance()
+            .findFieldByPath(internalSchema, first.getPath().toDotString());
+    PrimitiveType primitiveType = first.getPrimitiveType();
+    long totalNumValues = 
chunks.stream().mapToLong(ColumnChunkMetaData::getValueCount).sum();
+    long totalSize = 
chunks.stream().mapToLong(ColumnChunkMetaData::getTotalSize).sum();
+    Object globalMin =
+        chunks.stream()
+            .map(c -> convertStatsToInternalType(primitiveType, 
c.getStatistics().genericGetMin()))

Review Comment:
   `c.getStatistics().genericGetMin()` returns `null` for all-null columns 
(`hasNonNullValue() == false`). This would NPE inside 
`convertStatsToInternalType`. Consider filtering out chunks where stats have no 
non-null values before the min/max aggregation.



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java:
##########
@@ -187,9 +183,9 @@ public InternalTable getCurrentTable() {
   @Override
   public InternalSnapshot getCurrentSnapshot() {
     // to avoid consume the stream call the method twice to return the same 
stream of parquet files
-    Stream<InternalDataFile> internalDataFiles =
-        getInternalDataFiles(getParquetFiles(hadoopConf, basePath));
     InternalTable table = getMostRecentTable(getParquetFiles(hadoopConf, 
basePath));
+    Stream<InternalDataFile> internalDataFiles =
+        getInternalDataFiles(getParquetFiles(hadoopConf, basePath), 
table.getReadSchema());

Review Comment:
   `getParquetFiles()` is called 3 times in `getCurrentSnapshot()` (lines 186, 
188, 193), triggering 3 separate filesystem scans. This is inefficient and 
could yield inconsistent results if files change between calls. The comment on 
line 185 says "call the method twice" but it's actually 3 times now. Consider 
collecting to a list once and creating new streams from it? 



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java:
##########
@@ -65,12 +65,6 @@ public static ParquetStatsExtractor getInstance() {
   private static PathBasedPartitionSpecExtractor partitionSpecExtractor =

Review Comment:
   `partitionValueExtractor` (line 63), `partitionSpecExtractor` (line 65), and 
`getMaxFromColumnStats` (line 68) appear to be dead code after the removal of 
`toInternalDataFile`. These should be cleaned up along with the unused `import 
org.apache.hadoop.fs.*;` on line 32.



##########
xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetStatsExtractor.java:
##########
@@ -20,436 +20,506 @@
 
 import static org.apache.parquet.column.Encoding.BIT_PACKED;
 import static org.apache.parquet.column.Encoding.PLAIN;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import java.io.File;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.statistics.BinaryStatistics;
-import org.apache.parquet.column.statistics.BooleanStatistics;
 import org.apache.parquet.column.statistics.IntStatistics;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
 import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.*;

Review Comment:
   Nit: wildcard import `org.apache.parquet.schema.*;` — and the explicit 
`import org.apache.parquet.schema.MessageType` on line 49 is redundant since 
the wildcard already covers it. Expand the wildcard into explicit imports.



##########
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java:
##########
@@ -79,78 +73,70 @@ private static Optional<Long> 
getMaxFromColumnStats(List<ColumnStat> columnStats
         .max(Long::compareTo);
   }
 
-  public static Map<ColumnDescriptor, List<ColumnStat>> 
getStatsForFile(ParquetMetadata footer) {
-    Map<ColumnDescriptor, List<ColumnStat>> columnDescStats = new HashMap<>();
+  @SuppressWarnings("unchecked")
+  private static final Comparator<Object> COMPARABLE_COMPARATOR =
+      (a, b) -> ((Comparable<Object>) a).compareTo(b);
+
+  private static ColumnStat mergeColumnChunks(
+      List<ColumnChunkMetaData> chunks, InternalSchema internalSchema) {
+    ColumnChunkMetaData first = chunks.get(0);
+    InternalField internalField =
+        SchemaFieldFinder.getInstance()
+            .findFieldByPath(internalSchema, first.getPath().toDotString());

Review Comment:
   `SchemaFieldFinder.findFieldByPath()` returns `null` if the column path 
doesn't exist in the schema (per its Javadoc). This null would silently 
propagate into the `ColumnStat`. Consider adding 
`Objects.requireNonNull(internalField, "No field found for path: " + 
first.getPath().toDotString())`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to