JingsongLi commented on code in PR #6455:
URL: https://github.com/apache/paimon/pull/6455#discussion_r2458495888


##########
paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java:
##########
@@ -47,13 +69,281 @@ public DataEvolutionFileStoreScan(
                 false);
     }
 
+    @Override
+    public FileStoreScan dropStats() {
+        this.dropStats = true;
+        return this;
+    }
+
+    @Override
+    public FileStoreScan keepStats() {
+        this.dropStats = false;
+        return this;
+    }
+
     public DataEvolutionFileStoreScan withFilter(Predicate predicate) {
+        this.inputFilter = predicate;
         return this;
     }
 
+    @Override
+    protected List<ManifestEntry> postFilter(List<ManifestEntry> entries) {
+        if (inputFilter == null) {
+            return entries;
+        }
+        List<List<FakeDataFileMeta>> splitByRowId =
+                DataEvolutionSplitGenerator.split(
+                        
entries.stream().map(FakeDataFileMeta::new).collect(Collectors.toList()));
+
+        return splitByRowId.stream()
+                .filter(this::filterByStats)
+                .flatMap(s -> s.stream().map(r -> r.entry))
+                .map(entry -> dropStats ? dropStats(entry) : entry)
+                .collect(Collectors.toList());
+    }
+
+    private boolean filterByStats(List<FakeDataFileMeta> metas) {
+        long rowCount = metas.get(0).rowCount();
+        SimpleStatsEvolution.Result evolutionResult = evolutionStats(metas);
+        return inputFilter.test(
+                rowCount,
+                evolutionResult.minValues(),
+                evolutionResult.maxValues(),
+                evolutionResult.nullCounts());
+    }
+
+    private SimpleStatsEvolution.Result evolutionStats(List<FakeDataFileMeta> 
metas) {
+        int[] allFields = 
schema.fields().stream().mapToInt(DataField::id).toArray();
+        int fieldsCount = schema.fields().size();
+        int[] rowOffsets = new int[fieldsCount];
+        int[] fieldOffsets = new int[fieldsCount];
+        Arrays.fill(rowOffsets, -1);
+        Arrays.fill(fieldOffsets, -1);
+
+        InternalRow[] min = new InternalRow[metas.size()];
+        InternalRow[] max = new InternalRow[metas.size()];
+        BinaryArray[] nullCounts = new BinaryArray[metas.size()];
+
+        for (int i = 0; i < metas.size(); i++) {
+            SimpleStats stats = metas.get(i).valueStats();
+            min[i] = stats.minValues();
+            max[i] = stats.maxValues();
+            nullCounts[i] = stats.nullCounts();
+        }
+
+        for (int i = 0; i < metas.size(); i++) {
+            FakeDataFileMeta fileMeta = metas.get(i);
+            TableSchema dataFileSchema =
+                    scanTableSchema(fileMeta.schemaId())
+                            .project(
+                                    fileMeta.valueStatsCols() == null
+                                            ? fileMeta.writeCols()
+                                            : fileMeta.valueStatsCols());
+            int[] fieldIds =
+                    
SpecialFields.rowTypeWithRowTracking(dataFileSchema.logicalRowType())
+                            .getFields().stream()
+                            .mapToInt(DataField::id)
+                            .toArray();
+
+            int count = 0;
+            for (int j = 0; j < fieldsCount; j++) {
+                for (int fieldId : fieldIds) {
+                    if (allFields[j] == fieldId) {
+                        // TODO: If type not match (e.g. int -> string), we 
need to skip this, set
+                        // rowOffsets[j] = -1 always. (may -2, after all, set 
it back to -1)
+                        // Because schema evolution may happen to change int 
to string or something
+                        // like that.
+                        if (rowOffsets[j] == -1) {
+                            rowOffsets[j] = i;
+                            fieldOffsets[j] = count++;
+                        }
+                        break;
+                    }
+                }
+            }
+        }
+
+        DataEvolutionRow finalMin = new DataEvolutionRow(metas.size(), 
rowOffsets, fieldOffsets);
+        DataEvolutionRow finalMax = new DataEvolutionRow(metas.size(), 
rowOffsets, fieldOffsets);
+        DataEvolutionArray finalNullCounts =
+                new DataEvolutionArray(metas.size(), rowOffsets, fieldOffsets);
+
+        finalMin.setRows(min);
+        finalMax.setRows(max);
+        finalNullCounts.setRows(nullCounts);
+        return new SimpleStatsEvolution.Result(finalMin, finalMax, 
finalNullCounts);
+    }
+
     /** Note: Keep this thread-safe. */
     @Override
     protected boolean filterByStats(ManifestEntry entry) {
         return true;
     }
+
+    private static class FakeDataFileMeta implements DataFileMeta {

Review Comment:
   I don't get why we need to have this fake.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to