This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new f88f128dd9 Core: Generate realistic bounds in benchmarks (#11022)
f88f128dd9 is described below
commit f88f128dd971ceaae071761102f021264bf133a0
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Tue Aug 27 13:47:53 2024 -0700
Core: Generate realistic bounds in benchmarks (#11022)
---
.../org/apache/iceberg/FileGenerationUtil.java | 76 +++++++++++++--
.../org/apache/iceberg/TestFileGenerationUtil.java | 108 +++++++++++++++++++++
2 files changed, 176 insertions(+), 8 deletions(-)
diff --git a/core/src/test/java/org/apache/iceberg/FileGenerationUtil.java
b/core/src/test/java/org/apache/iceberg/FileGenerationUtil.java
index 98a6eafaf8..e48f23ff9a 100644
--- a/core/src/test/java/org/apache/iceberg/FileGenerationUtil.java
+++ b/core/src/test/java/org/apache/iceberg/FileGenerationUtil.java
@@ -19,27 +19,48 @@
package org.apache.iceberg;
import java.nio.ByteBuffer;
+import java.util.Comparator;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
+import org.apache.iceberg.MetricsModes.Counts;
+import org.apache.iceberg.MetricsModes.MetricsMode;
+import org.apache.iceberg.MetricsModes.None;
+import org.apache.iceberg.MetricsModes.Truncate;
import org.apache.iceberg.io.DeleteSchemaUtil;
import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type.PrimitiveType;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.RandomUtil;
public class FileGenerationUtil {
private FileGenerationUtil() {}
public static DataFile generateDataFile(Table table, StructLike partition) {
+ return generateDataFile(table, partition, ImmutableMap.of(),
ImmutableMap.of());
+ }
+
+ public static DataFile generateDataFile(
+ Table table,
+ StructLike partition,
+ Map<Integer, ByteBuffer> lowerBounds,
+ Map<Integer, ByteBuffer> upperBounds) {
Schema schema = table.schema();
PartitionSpec spec = table.spec();
LocationProvider locations = table.locationProvider();
String path = locations.newDataLocation(spec, partition,
generateFileName());
long fileSize = generateFileSize();
- Metrics metrics = generateRandomMetrics(schema);
+ MetricsConfig metricsConfig = MetricsConfig.forTable(table);
+ Metrics metrics = generateRandomMetrics(schema, metricsConfig,
lowerBounds, upperBounds);
return DataFiles.builder(spec)
.withPath(path)
.withPartition(partition)
@@ -91,7 +112,11 @@ public class FileGenerationUtil {
return String.format("%d-%d-%s-%d.parquet", partitionId, taskId,
operationId, fileCount);
}
- public static Metrics generateRandomMetrics(Schema schema) {
+ public static Metrics generateRandomMetrics(
+ Schema schema,
+ MetricsConfig metricsConfig,
+ Map<Integer, ByteBuffer> knownLowerBounds,
+ Map<Integer, ByteBuffer> knownUpperBounds) {
long rowCount = generateRowCount();
Map<Integer, Long> columnSizes = Maps.newHashMap();
Map<Integer, Long> valueCounts = Maps.newHashMap();
@@ -106,12 +131,16 @@ public class FileGenerationUtil {
valueCounts.put(fieldId, generateValueCount());
nullValueCounts.put(fieldId, (long) random().nextInt(5));
nanValueCounts.put(fieldId, (long) random().nextInt(5));
- byte[] lower = new byte[16];
- random().nextBytes(lower);
- lowerBounds.put(fieldId, ByteBuffer.wrap(lower));
- byte[] upper = new byte[16];
- random().nextBytes(upper);
- upperBounds.put(fieldId, ByteBuffer.wrap(upper));
+ if (knownLowerBounds.containsKey(fieldId) &&
knownUpperBounds.containsKey(fieldId)) {
+ lowerBounds.put(fieldId, knownLowerBounds.get(fieldId));
+ upperBounds.put(fieldId, knownUpperBounds.get(fieldId));
+ } else if (column.type().isPrimitiveType()) {
+ PrimitiveType type = column.type().asPrimitiveType();
+ MetricsMode metricsMode = metricsConfig.columnMode(column.name());
+ Pair<ByteBuffer, ByteBuffer> bounds = generateBounds(type,
metricsMode);
+ lowerBounds.put(fieldId, bounds.first());
+ upperBounds.put(fieldId, bounds.second());
+ }
}
return new Metrics(
@@ -185,6 +214,37 @@ public class FileGenerationUtil {
return random().nextInt(50_000);
}
+ private static Pair<ByteBuffer, ByteBuffer> generateBounds(PrimitiveType
type, MetricsMode mode) {
+ Comparator<Object> cmp = Comparators.forType(type);
+ Object value1 = generateBound(type, mode);
+ Object value2 = generateBound(type, mode);
+ if (cmp.compare(value1, value2) > 0) {
+ ByteBuffer lowerBuffer = Conversions.toByteBuffer(type, value2);
+ ByteBuffer upperBuffer = Conversions.toByteBuffer(type, value1);
+ return Pair.of(lowerBuffer, upperBuffer);
+ } else {
+ ByteBuffer lowerBuffer = Conversions.toByteBuffer(type, value1);
+ ByteBuffer upperBuffer = Conversions.toByteBuffer(type, value2);
+ return Pair.of(lowerBuffer, upperBuffer);
+ }
+ }
+
+ private static Object generateBound(PrimitiveType type, MetricsMode mode) {
+ if (mode instanceof None || mode instanceof Counts) {
+ return null;
+ } else if (mode instanceof Truncate) {
+ Object value = RandomUtil.generatePrimitive(type, random());
+ Transform<Object, Object> truncate = Transforms.truncate(((Truncate)
mode).length());
+ if (truncate.canTransform(type)) {
+ return truncate.bind(type).apply(value);
+ } else {
+ return value;
+ }
+ } else {
+ return RandomUtil.generatePrimitive(type, random());
+ }
+ }
+
private static Random random() {
return ThreadLocalRandom.current();
}
diff --git a/core/src/test/java/org/apache/iceberg/TestFileGenerationUtil.java
b/core/src/test/java/org/apache/iceberg/TestFileGenerationUtil.java
new file mode 100644
index 0000000000..ea44aa73c6
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestFileGenerationUtil.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import org.apache.iceberg.MetricsModes.MetricsMode;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type.PrimitiveType;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
+import org.junit.jupiter.api.Test;
+
+public class TestFileGenerationUtil {
+
+ public static final Schema SCHEMA =
+ new Schema(
+ required(1, "int_col", Types.IntegerType.get()),
+ required(2, "long_col", Types.LongType.get()),
+ required(3, "decimal_col", Types.DecimalType.of(10, 10)),
+ required(4, "date_col", Types.DateType.get()),
+ required(5, "timestamp_col", Types.TimestampType.withoutZone()),
+ required(6, "timestamp_tz_col", Types.TimestampType.withZone()),
+ required(7, "str_col", Types.StringType.get()));
+
+ @Test
+ public void testBoundsWithDefaultMetricsConfig() {
+ MetricsConfig metricsConfig = MetricsConfig.getDefault();
+ Metrics metrics =
+ FileGenerationUtil.generateRandomMetrics(
+ SCHEMA,
+ metricsConfig,
+ ImmutableMap.of() /* no known lower bounds */,
+ ImmutableMap.of() /* no known upper bounds */);
+
+ assertThat(metrics.lowerBounds()).hasSize(SCHEMA.columns().size());
+ assertThat(metrics.upperBounds()).hasSize(SCHEMA.columns().size());
+
+ checkBounds(metrics, metricsConfig);
+ }
+
+ @Test
+ public void testBoundsWithSpecificValues() {
+ MetricsConfig metricsConfig = MetricsConfig.getDefault();
+ NestedField intField = SCHEMA.findField("int_col");
+ PrimitiveType type = intField.type().asPrimitiveType();
+ ByteBuffer intLower = Conversions.toByteBuffer(type, 0);
+ ByteBuffer intUpper = Conversions.toByteBuffer(type, Integer.MAX_VALUE);
+ Metrics metrics =
+ FileGenerationUtil.generateRandomMetrics(
+ SCHEMA,
+ metricsConfig,
+ ImmutableMap.of(intField.fieldId(), intLower),
+ ImmutableMap.of(intField.fieldId(), intUpper));
+
+ assertThat(metrics.lowerBounds()).hasSize(SCHEMA.columns().size());
+ assertThat(metrics.upperBounds()).hasSize(SCHEMA.columns().size());
+
+ checkBounds(metrics, metricsConfig);
+
+ ByteBuffer actualIntLower = metrics.lowerBounds().get(intField.fieldId());
+ ByteBuffer actualIntUpper = metrics.upperBounds().get(intField.fieldId());
+ assertThat(actualIntLower).isEqualTo(intLower);
+ assertThat(actualIntUpper).isEqualTo(intUpper);
+ }
+
+ private void checkBounds(Metrics metrics, MetricsConfig metricsConfig) {
+ for (NestedField field : SCHEMA.columns()) {
+ MetricsMode mode = metricsConfig.columnMode(field.name());
+ ByteBuffer lowerBuffer = metrics.lowerBounds().get(field.fieldId());
+ ByteBuffer upperBuffer = metrics.upperBounds().get(field.fieldId());
+ if (mode.equals(MetricsModes.None.get()) ||
mode.equals(MetricsModes.Counts.get())) {
+ assertThat(lowerBuffer).isNull();
+ assertThat(upperBuffer).isNull();
+ } else {
+ checkBounds(field.type().asPrimitiveType(), lowerBuffer, upperBuffer);
+ }
+ }
+ }
+
+ private void checkBounds(PrimitiveType type, ByteBuffer lowerBuffer,
ByteBuffer upperBuffer) {
+ Object lower = Conversions.fromByteBuffer(type, lowerBuffer);
+ Object upper = Conversions.fromByteBuffer(type, upperBuffer);
+ Comparator<Object> cmp = Comparators.forType(type);
+ assertThat(cmp.compare(lower, upper)).isLessThanOrEqualTo(0);
+ }
+}