This is an automated email from the ASF dual-hosted git repository.
timbrown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push:
new d3bbe5fe Delta source: add record count for DataFile
d3bbe5fe is described below
commit d3bbe5febb6facd7b9dc7aa0994e37125cac91eb
Author: Hanzhi Wang <[email protected]>
AuthorDate: Wed Jan 15 23:12:49 2025 -0800
Delta source: add record count for DataFile
The change improved how to get the number of records from Delta source and
fixed the edge case when columnStats list is empty, the number of records is 0.
A wrapper called FileStats is added to wrap the list of ColumnStat and num
of records per file.
---
.../org/apache/xtable/model/stat/FileStats.java | 36 +++++++++++++++
.../apache/xtable/delta/DeltaActionsConverter.java | 10 ++---
.../apache/xtable/delta/DeltaStatsExtractor.java | 52 ++++++++++++----------
.../xtable/delta/TestDeltaStatsExtractor.java | 29 ++++++++----
4 files changed, 90 insertions(+), 37 deletions(-)
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/stat/FileStats.java
b/xtable-api/src/main/java/org/apache/xtable/model/stat/FileStats.java
new file mode 100644
index 00000000..a6e9350e
--- /dev/null
+++ b/xtable-api/src/main/java/org/apache/xtable/model/stat/FileStats.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.model.stat;
+
+import java.util.List;
+
+import lombok.Builder;
+import lombok.Value;
+
+/**
+ * Captures a file level statistics.
+ *
+ * @since 0.2
+ */
+@Value
+@Builder(toBuilder = true)
+public class FileStats {
+ List<ColumnStat> columnStats;
+ long numRecords;
+}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java
index 40b822df..16a320f1 100644
---
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java
+++
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java
@@ -35,6 +35,7 @@ import org.apache.xtable.exception.NotSupportedException;
import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.stat.ColumnStat;
+import org.apache.xtable.model.stat.FileStats;
import org.apache.xtable.model.storage.FileFormat;
import org.apache.xtable.model.storage.InternalDataFile;
@@ -56,13 +57,10 @@ public class DeltaActionsConverter {
boolean includeColumnStats,
DeltaPartitionExtractor partitionExtractor,
DeltaStatsExtractor fileStatsExtractor) {
+ FileStats fileStats = fileStatsExtractor.getColumnStatsForFile(addFile,
fields);
List<ColumnStat> columnStats =
- includeColumnStats
- ? fileStatsExtractor.getColumnStatsForFile(addFile, fields)
- : Collections.emptyList();
- long recordCount =
-
columnStats.stream().map(ColumnStat::getNumValues).max(Long::compareTo).orElse(0L);
- // TODO(https://github.com/apache/incubator-xtable/issues/102): removed
record count.
+ includeColumnStats ? fileStats.getColumnStats() :
Collections.emptyList();
+ long recordCount = fileStats.getNumRecords();
return InternalDataFile.builder()
.physicalPath(getFullPathToFile(deltaSnapshot, addFile.path()))
.fileFormat(fileFormat)
diff --git
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java
index 75ecce33..a685700e 100644
--- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java
+++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java
@@ -55,6 +55,7 @@ import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.schema.InternalType;
import org.apache.xtable.model.stat.ColumnStat;
+import org.apache.xtable.model.stat.FileStats;
import org.apache.xtable.model.stat.Range;
/**
@@ -185,9 +186,9 @@ public class DeltaStatsExtractor {
}
}
- public List<ColumnStat> getColumnStatsForFile(AddFile addFile,
List<InternalField> fields) {
+ public FileStats getColumnStatsForFile(AddFile addFile, List<InternalField>
fields) {
if (StringUtils.isEmpty(addFile.stats())) {
- return Collections.emptyList();
+ return
FileStats.builder().columnStats(Collections.emptyList()).numRecords(0).build();
}
// TODO: Additional work needed to track maps & arrays.
try {
@@ -197,27 +198,32 @@ public class DeltaStatsExtractor {
Map<String, Object> fieldPathToMaxValue =
flattenStatMap(deltaStats.getMaxValues());
Map<String, Object> fieldPathToMinValue =
flattenStatMap(deltaStats.getMinValues());
Map<String, Object> fieldPathToNullCount =
flattenStatMap(deltaStats.getNullCount());
- return fields.stream()
- .filter(field -> fieldPathToMaxValue.containsKey(field.getPath()))
- .map(
- field -> {
- String fieldPath = field.getPath();
- Object minValue =
- DeltaValueConverter.convertFromDeltaColumnStatValue(
- fieldPathToMinValue.get(fieldPath), field.getSchema());
- Object maxValue =
- DeltaValueConverter.convertFromDeltaColumnStatValue(
- fieldPathToMaxValue.get(fieldPath), field.getSchema());
- Number nullCount = (Number)
fieldPathToNullCount.get(fieldPath);
- Range range = Range.vector(minValue, maxValue);
- return ColumnStat.builder()
- .field(field)
- .numValues(deltaStats.getNumRecords())
- .numNulls(nullCount.longValue())
- .range(range)
- .build();
- })
- .collect(CustomCollectors.toList(fields.size()));
+ List<ColumnStat> columnStats =
+ fields.stream()
+ .filter(field ->
fieldPathToMaxValue.containsKey(field.getPath()))
+ .map(
+ field -> {
+ String fieldPath = field.getPath();
+ Object minValue =
+ DeltaValueConverter.convertFromDeltaColumnStatValue(
+ fieldPathToMinValue.get(fieldPath),
field.getSchema());
+ Object maxValue =
+ DeltaValueConverter.convertFromDeltaColumnStatValue(
+ fieldPathToMaxValue.get(fieldPath),
field.getSchema());
+ Number nullCount = (Number)
fieldPathToNullCount.get(fieldPath);
+ Range range = Range.vector(minValue, maxValue);
+ return ColumnStat.builder()
+ .field(field)
+ .numValues(deltaStats.getNumRecords())
+ .numNulls(nullCount.longValue())
+ .range(range)
+ .build();
+ })
+ .collect(CustomCollectors.toList(fields.size()));
+ return FileStats.builder()
+ .columnStats(columnStats)
+ .numRecords(deltaStats.getNumRecords())
+ .build();
} catch (IOException ex) {
throw new ParseException("Unable to parse stats json", ex);
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaStatsExtractor.java
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaStatsExtractor.java
index db685883..d4d35e7f 100644
---
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaStatsExtractor.java
+++
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaStatsExtractor.java
@@ -42,6 +42,7 @@ import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.schema.InternalType;
import org.apache.xtable.model.stat.ColumnStat;
+import org.apache.xtable.model.stat.FileStats;
import org.apache.xtable.model.stat.Range;
import org.apache.xtable.testutil.ColumnStatMapUtil;
@@ -120,11 +121,15 @@ public class TestDeltaStatsExtractor {
List<InternalField> fields = schema.getAllFields();
List<ColumnStat> columnStats = getColumnStats();
+ long numRecords = 50L;
String stats =
- DeltaStatsExtractor.getInstance().convertStatsToDeltaFormat(schema,
50L, columnStats);
+ DeltaStatsExtractor.getInstance()
+ .convertStatsToDeltaFormat(schema, numRecords, columnStats);
AddFile addFile = new AddFile("file://path/to/file", null, 0, 0, true,
stats, null, null);
DeltaStatsExtractor extractor = DeltaStatsExtractor.getInstance();
- List<ColumnStat> actual = extractor.getColumnStatsForFile(addFile, fields);
+ FileStats actual = extractor.getColumnStatsForFile(addFile, fields);
+ List<ColumnStat> actualColumStats = actual.getColumnStats();
+ long actualNumRecords = actual.getNumRecords();
Set<ColumnStat> expected = new HashSet<>();
columnStats.forEach(
@@ -137,7 +142,8 @@ public class TestDeltaStatsExtractor {
expected.add(columnStatWithoutSize);
}
});
- assertEquals(expected, new HashSet<>(actual));
+ assertEquals(expected, new HashSet<>(actualColumStats));
+ assertEquals(numRecords, actualNumRecords);
}
@Test
@@ -147,20 +153,24 @@ public class TestDeltaStatsExtractor {
Map<String, Object> maxValues = generateMap("b", 2, 2.0, 20);
Map<String, Object> nullValues = generateMap(1L, 2L, 3L, 4L);
Map<String, Object> deltaStats = new HashMap<>();
+ long numRecords = 100;
deltaStats.put("minValues", minValues);
deltaStats.put("maxValues", maxValues);
deltaStats.put("nullCount", nullValues);
- deltaStats.put("numRecords", 100);
+ deltaStats.put("numRecords", numRecords);
deltaStats.put("tightBounds", Boolean.TRUE);
deltaStats.put("nonExisting", minValues);
String stats = MAPPER.writeValueAsString(deltaStats);
AddFile addFile = new AddFile("file://path/to/file", null, 0, 0, true,
stats, null, null);
DeltaStatsExtractor extractor = DeltaStatsExtractor.getInstance();
- List<ColumnStat> actual = extractor.getColumnStatsForFile(addFile, fields);
+ FileStats actual = extractor.getColumnStatsForFile(addFile, fields);
+ List<ColumnStat> actualColumStats = actual.getColumnStats();
+ long actualNumRecords = actual.getNumRecords();
Set<String> unsupportedStats = extractor.getUnsupportedStats();
assertEquals(2, unsupportedStats.size());
assertTrue(unsupportedStats.contains("tightBounds"));
assertTrue(unsupportedStats.contains("nonExisting"));
+ assertEquals(numRecords, actualNumRecords);
List<ColumnStat> expected =
Arrays.asList(
@@ -188,7 +198,7 @@ public class TestDeltaStatsExtractor {
.numNulls(4)
.range(Range.vector(10, 20))
.build());
- assertEquals(expected, actual);
+ assertEquals(expected, actualColumStats);
}
@Test
@@ -196,8 +206,11 @@ public class TestDeltaStatsExtractor {
List<InternalField> fields = getSchemaFields();
AddFile addFile = new AddFile("file://path/to/file", null, 0, 0, true,
null, null, null);
DeltaStatsExtractor extractor = DeltaStatsExtractor.getInstance();
- List<ColumnStat> actual = extractor.getColumnStatsForFile(addFile, fields);
- assertEquals(Collections.emptyList(), actual);
+ FileStats actual = extractor.getColumnStatsForFile(addFile, fields);
+ List<ColumnStat> actualColumStats = actual.getColumnStats();
+ long actualNumRecords = actual.getNumRecords();
+ assertEquals(Collections.emptyList(), actualColumStats);
+ assertEquals(0, actualNumRecords);
}
private List<InternalField> getSchemaFields() {