This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new b21a8ce249 API: add StructTransform base class for PartitionKey and
SortKey. add SortOrderComparators (#7798)
b21a8ce249 is described below
commit b21a8ce2497eb560feb9ed771a1e28ca57d1cf60
Author: Steven Zhen Wu <[email protected]>
AuthorDate: Tue Nov 28 10:41:38 2023 -0800
API: add StructTransform base class for PartitionKey and SortKey. add
SortOrderComparators (#7798)
---
.../main/java/org/apache/iceberg/PartitionKey.java | 97 +---
api/src/main/java/org/apache/iceberg/SortKey.java | 56 +++
.../org/apache/iceberg/SortOrderComparators.java | 105 +++++
.../java/org/apache/iceberg/StructTransform.java | 149 +++++++
.../apache/iceberg/TestSortOrderComparators.java | 489 +++++++++++++++++++++
5 files changed, 814 insertions(+), 82 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/PartitionKey.java
b/api/src/main/java/org/apache/iceberg/PartitionKey.java
index fc56d1a453..08e3ea319b 100644
--- a/api/src/main/java/org/apache/iceberg/PartitionKey.java
+++ b/api/src/main/java/org/apache/iceberg/PartitionKey.java
@@ -18,13 +18,8 @@
*/
package org.apache.iceberg;
-import java.io.Serializable;
-import java.lang.reflect.Array;
-import java.util.Arrays;
import java.util.List;
-import java.util.function.Function;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.util.SerializableFunction;
+import java.util.stream.Collectors;
/**
* A struct of partition values.
@@ -32,58 +27,23 @@ import org.apache.iceberg.util.SerializableFunction;
* <p>Instances of this class can produce partition values from a data row
passed to {@link
* #partition(StructLike)}.
*/
-public class PartitionKey implements StructLike, Serializable {
+public class PartitionKey extends StructTransform {
private final PartitionSpec spec;
- private final int size;
- private final Object[] partitionTuple;
- private final SerializableFunction[] transforms;
- private final Accessor<StructLike>[] accessors;
+ private final Schema inputSchema;
@SuppressWarnings("unchecked")
public PartitionKey(PartitionSpec spec, Schema inputSchema) {
+ super(inputSchema, fieldTransform(spec));
this.spec = spec;
-
- List<PartitionField> fields = spec.fields();
- this.size = fields.size();
- this.partitionTuple = new Object[size];
- this.transforms = new SerializableFunction[size];
- this.accessors = (Accessor<StructLike>[])
Array.newInstance(Accessor.class, size);
-
- Schema schema = spec.schema();
- for (int i = 0; i < size; i += 1) {
- PartitionField field = fields.get(i);
- Accessor<StructLike> accessor =
inputSchema.accessorForField(field.sourceId());
- Preconditions.checkArgument(
- accessor != null,
- "Cannot build accessor for field: " +
schema.findField(field.sourceId()));
- this.accessors[i] = accessor;
- this.transforms[i] = field.transform().bind(accessor.type());
- }
+ this.inputSchema = inputSchema;
}
private PartitionKey(PartitionKey toCopy) {
+ // only need deep copy inside StructTransform
+ super(toCopy);
this.spec = toCopy.spec;
- this.size = toCopy.size;
- this.partitionTuple = new Object[toCopy.partitionTuple.length];
- this.transforms = toCopy.transforms;
- this.accessors = toCopy.accessors;
-
- System.arraycopy(toCopy.partitionTuple, 0, this.partitionTuple, 0,
partitionTuple.length);
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("[");
- for (int i = 0; i < partitionTuple.length; i += 1) {
- if (i > 0) {
- sb.append(", ");
- }
- sb.append(partitionTuple[i]);
- }
- sb.append("]");
- return sb.toString();
+ this.inputSchema = toCopy.inputSchema;
}
public PartitionKey copy() {
@@ -101,41 +61,14 @@ public class PartitionKey implements StructLike,
Serializable {
*/
@SuppressWarnings("unchecked")
public void partition(StructLike row) {
- for (int i = 0; i < partitionTuple.length; i += 1) {
- Function<Object, Object> transform = transforms[i];
- partitionTuple[i] = transform.apply(accessors[i].get(row));
- }
- }
-
- @Override
- public int size() {
- return size;
- }
-
- @Override
- public <T> T get(int pos, Class<T> javaClass) {
- return javaClass.cast(partitionTuple[pos]);
- }
-
- @Override
- public <T> void set(int pos, T value) {
- partitionTuple[pos] = value;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- } else if (!(o instanceof PartitionKey)) {
- return false;
- }
-
- PartitionKey that = (PartitionKey) o;
- return Arrays.equals(partitionTuple, that.partitionTuple);
+ wrap(row);
}
- @Override
- public int hashCode() {
- return Arrays.hashCode(partitionTuple);
+ private static List<FieldTransform> fieldTransform(PartitionSpec spec) {
+ return spec.fields().stream()
+ .map(
+ partitionField ->
+ new FieldTransform(partitionField.sourceId(),
partitionField.transform()))
+ .collect(Collectors.toList());
}
}
diff --git a/api/src/main/java/org/apache/iceberg/SortKey.java
b/api/src/main/java/org/apache/iceberg/SortKey.java
new file mode 100644
index 0000000000..8a9ccf0bd7
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/SortKey.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * A struct of flattened sort field values.
+ *
+ * <p>Instances of this class can produce sort values from a row passed to
{@link
+ * #wrap(StructLike)}.
+ */
+public class SortKey extends StructTransform {
+ private final Schema schema;
+ private final SortOrder sortOrder;
+
+ public SortKey(Schema schema, SortOrder sortOrder) {
+ super(schema, fieldTransform(sortOrder));
+ this.schema = schema;
+ this.sortOrder = sortOrder;
+ }
+
+ private SortKey(SortKey toCopy) {
+ // only need deep copy inside StructTransform
+ super(toCopy);
+ this.schema = toCopy.schema;
+ this.sortOrder = toCopy.sortOrder;
+ }
+
+ public SortKey copy() {
+ return new SortKey(this);
+ }
+
+ private static List<FieldTransform> fieldTransform(SortOrder sortOrder) {
+ return sortOrder.fields().stream()
+ .map(sortField -> new FieldTransform(sortField.sourceId(),
sortField.transform()))
+ .collect(Collectors.toList());
+ }
+}
diff --git a/api/src/main/java/org/apache/iceberg/SortOrderComparators.java
b/api/src/main/java/org/apache/iceberg/SortOrderComparators.java
new file mode 100644
index 0000000000..c730127c54
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/SortOrderComparators.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.lang.reflect.Array;
+import java.util.Comparator;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+public class SortOrderComparators {
+ private SortOrderComparators() {}
+
+ /** Compare structs with the specified sort order projection */
+ public static Comparator<StructLike> forSchema(Schema schema, SortOrder
sortOrder) {
+ Preconditions.checkArgument(sortOrder.isSorted(), "Invalid sort order:
unsorted");
+ SortOrder.checkCompatibility(sortOrder, schema);
+ return new SortOrderComparator(schema, sortOrder);
+ }
+
+ /** Util method to chain sort direction and null order to the original
comparator. */
+ private static Comparator<Object> sortFieldComparator(
+ Comparator<Object> original, SortField sortField) {
+ Comparator<Object> comparator = original;
+ if (sortField == null) {
+ return Comparators.nullsFirst().thenComparing(comparator);
+ }
+
+ if (sortField.direction() == SortDirection.DESC) {
+ comparator = comparator.reversed();
+ }
+
+ if (sortField.nullOrder() == NullOrder.NULLS_FIRST) {
+ comparator = Comparators.nullsFirst().thenComparing(comparator);
+ } else if (sortField.nullOrder() == NullOrder.NULLS_LAST) {
+ comparator = Comparators.nullsLast().thenComparing(comparator);
+ }
+
+ return comparator;
+ }
+
+ private static class SortOrderComparator implements Comparator<StructLike> {
+ private final SortKey leftKey;
+ private final SortKey rightKey;
+ private final int size;
+ private final Comparator<Object>[] comparators;
+ private final Type[] transformResultTypes;
+
+ private SortOrderComparator(Schema schema, SortOrder sortOrder) {
+ this.leftKey = new SortKey(schema, sortOrder);
+ this.rightKey = new SortKey(schema, sortOrder);
+ this.size = sortOrder.fields().size();
+ this.comparators = (Comparator<Object>[])
Array.newInstance(Comparator.class, size);
+ this.transformResultTypes = (Type[]) Array.newInstance(Type.class, size);
+
+ for (int i = 0; i < size; ++i) {
+ SortField sortField = sortOrder.fields().get(i);
+ Types.NestedField field = schema.findField(sortField.sourceId());
+ Type transformResultType =
sortField.transform().getResultType(field.type());
+ Preconditions.checkArgument(
+ transformResultType.isPrimitiveType(), "Invalid transform result
type: non-primitive");
+ transformResultTypes[i] = transformResultType;
+ Comparator<Object> comparator =
Comparators.forType(transformResultType.asPrimitiveType());
+ comparators[i] = sortFieldComparator(comparator, sortField);
+ }
+ }
+
+ @Override
+ public int compare(StructLike left, StructLike right) {
+ if (left == right) {
+ return 0;
+ }
+
+ leftKey.wrap(left);
+ rightKey.wrap(right);
+
+ for (int i = 0; i < size; i += 1) {
+ Class<?> valueClass = transformResultTypes[i].typeId().javaClass();
+ int cmp = comparators[i].compare(leftKey.get(i, valueClass),
rightKey.get(i, valueClass));
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+
+ return 0;
+ }
+ }
+}
diff --git a/api/src/main/java/org/apache/iceberg/StructTransform.java
b/api/src/main/java/org/apache/iceberg/StructTransform.java
new file mode 100644
index 0000000000..dcb27488a5
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/StructTransform.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.util.SerializableFunction;
+
+/**
+ * A struct of flattened transformed values.
+ *
+ * <p>Instances of this class can produce transformed values from a row passed
to {@link
+ * #wrap(StructLike)}.
+ */
+class StructTransform implements StructLike, Serializable {
+
+ private final int size;
+ private final Accessor<StructLike>[] accessors;
+
+ @SuppressWarnings("rawtypes")
+ private final SerializableFunction[] transforms;
+
+ private final Object[] transformedTuple;
+
+ StructTransform(Schema schema, List<FieldTransform> fieldTransforms) {
+ Preconditions.checkArgument(fieldTransforms != null, "Invalid field
transform list: null");
+
+ this.size = fieldTransforms.size();
+ this.accessors = (Accessor<StructLike>[])
Array.newInstance(Accessor.class, size);
+ this.transforms = new SerializableFunction[size];
+
+ for (int i = 0; i < size; ++i) {
+ int sourceFieldId = fieldTransforms.get(i).sourceFieldId();
+ Transform<?, ?> transform = fieldTransforms.get(i).transform();
+ Accessor<StructLike> accessor = schema.accessorForField(sourceFieldId);
+ Preconditions.checkArgument(
+ accessor != null, "Cannot build accessor for field: %s",
schema.findField(sourceFieldId));
+ this.accessors[i] = accessor;
+ this.transforms[i] = transform.bind(accessor.type());
+ }
+
+ this.transformedTuple = new Object[size];
+ }
+
+ StructTransform(StructTransform toCopy) {
+ this.size = toCopy.size;
+ this.accessors = toCopy.accessors;
+ this.transforms = toCopy.transforms;
+
+ this.transformedTuple = new Object[size];
+ System.arraycopy(toCopy.transformedTuple, 0, this.transformedTuple, 0,
size);
+ }
+
+ public void wrap(StructLike row) {
+ for (int i = 0; i < transformedTuple.length; i += 1) {
+ Function<Object, Object> transform = transforms[i];
+ transformedTuple[i] = transform.apply(accessors[i].get(row));
+ }
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ @Override
+ public <T> T get(int pos, Class<T> javaClass) {
+ return javaClass.cast(transformedTuple[pos]);
+ }
+
+ @Override
+ public <T> void set(int pos, T value) {
+ transformedTuple[pos] = value;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ for (int i = 0; i < transformedTuple.length; i += 1) {
+ if (i > 0) {
+ sb.append(", ");
+ }
+ sb.append(transformedTuple[i]);
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ } else if (!(o instanceof StructTransform)) {
+ return false;
+ }
+
+ StructTransform that = (StructTransform) o;
+ return Arrays.equals(transformedTuple, that.transformedTuple);
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(transformedTuple);
+ }
+
+ /**
+ * Simple POJO for source field id and transform function. {@code Pair}
class is not usable here
+ * in API module, as it has an Avro dep and is in the core module.
+ */
+ static class FieldTransform {
+ private final int sourceFieldId;
+ private final Transform<?, ?> transform;
+
+ FieldTransform(int sourceFieldId, Transform<?, ?> transform) {
+ this.sourceFieldId = sourceFieldId;
+ this.transform = transform;
+ }
+
+ int sourceFieldId() {
+ return sourceFieldId;
+ }
+
+ Transform<?, ?> transform() {
+ return transform;
+ }
+ }
+}
diff --git a/api/src/test/java/org/apache/iceberg/TestSortOrderComparators.java
b/api/src/test/java/org/apache/iceberg/TestSortOrderComparators.java
new file mode 100644
index 0000000000..90ac3e09b6
--- /dev/null
+++ b/api/src/test/java/org/apache/iceberg/TestSortOrderComparators.java
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Comparator;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SerializableFunction;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestSortOrderComparators {
+
+ private static void assertComparesCorrectly(
+ Schema schema,
+ SortOrder sortOrder,
+ StructLike less,
+ StructLike greater,
+ StructLike lessCopy,
+ StructLike nullValue) {
+
+ Comparator<StructLike> comparator = SortOrderComparators.forSchema(schema,
sortOrder);
+ // all fields should have the same sort direction in this test class
+
Assertions.assertThat(sortOrder.fields().stream().map(SortField::direction).distinct())
+ .hasSize(1);
+ SortDirection direction = sortOrder.fields().get(0).direction();
+
+ Assertions.assertThat(comparator.compare(less, less)).isEqualTo(0);
+ Assertions.assertThat(comparator.compare(greater, greater)).isEqualTo(0);
+ Assertions.assertThat(comparator.compare(less, lessCopy)).isEqualTo(0);
+
+ if (direction == SortDirection.ASC) {
+ Assertions.assertThat(comparator.compare(less, greater)).isEqualTo(-1);
+ Assertions.assertThat(comparator.compare(greater, less)).isEqualTo(1);
+ // null first
+ Assertions.assertThat(comparator.compare(nullValue, less)).isEqualTo(-1);
+ Assertions.assertThat(comparator.compare(less, nullValue)).isEqualTo(1);
+ } else {
+ Assertions.assertThat(comparator.compare(less, greater)).isEqualTo(1);
+ Assertions.assertThat(comparator.compare(greater, less)).isEqualTo(-1);
+ // null last
+ Assertions.assertThat(comparator.compare(nullValue,
greater)).isEqualTo(1);
+ Assertions.assertThat(comparator.compare(less, nullValue)).isEqualTo(-1);
+ }
+ }
+
+ private static Schema singleSortFildSchema(Type type) {
+ return new Schema(
+ Types.NestedField.optional(1, "id", Types.StringType.get()),
+ Types.NestedField.optional(2, "field", type));
+ }
+
+ private static final TestHelpers.Row SINGLE_SORT_FILED_NULL_VALUE =
+ TestHelpers.Row.of("id4", null);
+
+ private static SortOrder ascOrder(Schema schema) {
+ return SortOrder.builderFor(schema).asc("field").build();
+ }
+
+ private static SortOrder descOrder(Schema schema) {
+ return SortOrder.builderFor(schema).desc("field",
NullOrder.NULLS_LAST).build();
+ }
+
+ private static void testIdentitySortField(
+ Schema schema, TestHelpers.Row less, TestHelpers.Row greater,
TestHelpers.Row lessCopy) {
+ assertComparesCorrectly(
+ schema, ascOrder(schema), less, greater, lessCopy,
SINGLE_SORT_FILED_NULL_VALUE);
+ assertComparesCorrectly(
+ schema, descOrder(schema), less, greater, lessCopy,
SINGLE_SORT_FILED_NULL_VALUE);
+ }
+
+ @Test
+ public void testBoolean() {
+ testIdentitySortField(
+ singleSortFildSchema(Types.BooleanType.get()),
+ TestHelpers.Row.of("id3", Boolean.FALSE),
+ TestHelpers.Row.of("id2", Boolean.TRUE),
+ TestHelpers.Row.of("id1", Boolean.FALSE));
+ }
+
+ @Test
+ public void testInt() {
+ testIdentitySortField(
+ singleSortFildSchema(Types.IntegerType.get()),
+ TestHelpers.Row.of("id3", 111),
+ TestHelpers.Row.of("id2", 222),
+ TestHelpers.Row.of("id1", 111));
+ }
+
+ @Test
+ public void testLong() {
+ testIdentitySortField(
+ singleSortFildSchema(Types.LongType.get()),
+ TestHelpers.Row.of("id3", 111L),
+ TestHelpers.Row.of("id2", 222L),
+ TestHelpers.Row.of("id1", 111L));
+ }
+
+ @Test
+ public void testFloat() {
+ testIdentitySortField(
+ singleSortFildSchema(Types.FloatType.get()),
+ TestHelpers.Row.of("id3", 1.11f),
+ TestHelpers.Row.of("id1", 2.22f),
+ TestHelpers.Row.of("id1", 1.11f));
+ }
+
+ @Test
+ public void testDouble() {
+ testIdentitySortField(
+ singleSortFildSchema(Types.DoubleType.get()),
+ TestHelpers.Row.of("id3", 1.11d),
+ TestHelpers.Row.of("id2", 2.22d),
+ TestHelpers.Row.of("id1", 1.11d));
+ }
+
+ @Test
+ public void testDate() {
+ testIdentitySortField(
+ singleSortFildSchema(Types.DateType.get()),
+ TestHelpers.Row.of("id3", 111),
+ TestHelpers.Row.of("id2", 222),
+ TestHelpers.Row.of("id1", 111));
+ }
+
+ @Test
+ public void testTime() {
+ testIdentitySortField(
+ singleSortFildSchema(Types.TimeType.get()),
+ TestHelpers.Row.of("id3", 111L),
+ TestHelpers.Row.of("id2", 222L),
+ TestHelpers.Row.of("id1", 111L));
+ }
+
+ @Test
+ public void testTimestamp() {
+ Schema schemaWithZone =
singleSortFildSchema(Types.TimestampType.withZone());
+ Schema schemaWithoutZone =
singleSortFildSchema(Types.TimestampType.withoutZone());
+
+ long lessMicro =
+ TimeUnit.SECONDS.toMicros(
+ LocalDateTime.of(2022, 1, 10, 0, 0,
0).toEpochSecond(ZoneOffset.UTC));
+ long greaterMicro =
+ TimeUnit.SECONDS.toMicros(
+ LocalDateTime.of(2022, 1, 10, 1, 0,
0).toEpochSecond(ZoneOffset.UTC));
+ long lessCopyMicro =
+ TimeUnit.SECONDS.toMicros(
+ LocalDateTime.of(2022, 1, 10, 0, 0,
0).toEpochSecond(ZoneOffset.UTC));
+
+ TestHelpers.Row less = TestHelpers.Row.of("id3", lessMicro);
+ TestHelpers.Row greater = TestHelpers.Row.of("id2", greaterMicro);
+ TestHelpers.Row lessCopy = TestHelpers.Row.of("id1", lessCopyMicro);
+ testIdentitySortField(schemaWithZone, less, greater, lessCopy);
+ testIdentitySortField(schemaWithoutZone, less, greater, lessCopy);
+ }
+
+ @Test
+ public void testTimestampTransform() {
+ Schema schema = singleSortFildSchema(Types.TimestampType.withZone());
+ Transform<Long, Integer> bucket = Transforms.day();
+ SerializableFunction<Long, Integer> transform =
bucket.bind(Types.TimestampType.withZone());
+
+ long lessMicro =
+ TimeUnit.SECONDS.toMicros(
+ LocalDateTime.of(2022, 1, 10, 0, 0,
0).toEpochSecond(ZoneOffset.UTC));
+ long greaterMicro =
+ TimeUnit.SECONDS.toMicros(
+ LocalDateTime.of(2022, 1, 11, 0, 0,
0).toEpochSecond(ZoneOffset.UTC));
+ // intentionally changed hour value from 0 to 1. days should equal
+ long lessCopyMicro =
+ TimeUnit.SECONDS.toMicros(
+ LocalDateTime.of(2022, 1, 10, 1, 0,
0).toEpochSecond(ZoneOffset.UTC));
+
+
Assertions.assertThat(transform.apply(lessMicro)).isLessThan(transform.apply(greaterMicro));
+
Assertions.assertThat(transform.apply(lessMicro)).isEqualTo(transform.apply(lessCopyMicro));
+
+ TestHelpers.Row less = TestHelpers.Row.of("id3", lessMicro);
+ TestHelpers.Row greater = TestHelpers.Row.of("id2", greaterMicro);
+ TestHelpers.Row lessCopy = TestHelpers.Row.of("id1", lessCopyMicro);
+
+ SortOrder sortOrder =
+ SortOrder.builderFor(schema)
+ .sortBy(Expressions.day("field"), SortDirection.ASC,
NullOrder.NULLS_FIRST)
+ .build();
+ assertComparesCorrectly(
+ schema, sortOrder, less, greater, lessCopy,
SINGLE_SORT_FILED_NULL_VALUE);
+ }
+
+ @Test
+ public void testString() {
+ testIdentitySortField(
+ singleSortFildSchema(Types.StringType.get()),
+ TestHelpers.Row.of("id3", "aaa"),
+ TestHelpers.Row.of("id2", "bbb"),
+ TestHelpers.Row.of("id1", "aaa"));
+ }
+
+ @Test
+ public void testStringBucket() {
+ Schema schema = singleSortFildSchema(Types.StringType.get());
+ Transform<String, Integer> bucket = Transforms.bucket(4);
+ SerializableFunction<String, Integer> transform =
bucket.bind(Types.StringType.get());
+
+
Assertions.assertThat(transform.apply("bbb")).isLessThan(transform.apply("aaa"));
+
Assertions.assertThat(transform.apply("bbb")).isEqualTo(transform.apply("cca"));
+
+ TestHelpers.Row less = TestHelpers.Row.of("id3", "bbb");
+ TestHelpers.Row greater = TestHelpers.Row.of("id2", "aaa");
+ // bucket("cca", 4) equals to bucket("bbb", 4)
+ TestHelpers.Row lessCopy = TestHelpers.Row.of("id1", "cca");
+
+ SortOrder sortOrder =
+ SortOrder.builderFor(schema)
+ .sortBy(Expressions.bucket("field", 4), SortDirection.ASC,
NullOrder.NULLS_FIRST)
+ .build();
+ assertComparesCorrectly(
+ schema, sortOrder, less, greater, lessCopy,
SINGLE_SORT_FILED_NULL_VALUE);
+ }
+
+ @Test
+ public void testUuid() {
+ Schema schema = singleSortFildSchema(Types.UUIDType.get());
+ TestHelpers.Row less =
+ TestHelpers.Row.of("id3",
UUID.fromString("81873e7d-1374-4493-8e1d-9095eff7046c"));
+ TestHelpers.Row greater =
+ TestHelpers.Row.of("id2",
UUID.fromString("fd02441d-1423-4a3f-8785-c7dd5647e26b"));
+ TestHelpers.Row lessCopy =
+ TestHelpers.Row.of("id1",
UUID.fromString("81873e7d-1374-4493-8e1d-9095eff7046c"));
+ testIdentitySortField(schema, less, greater, lessCopy);
+ }
+
+ @Test
+ public void testUUIDBucket() {
+ Schema schema = singleSortFildSchema(Types.UUIDType.get());
+ Transform<UUID, Integer> bucket = Transforms.bucket(4);
+ SerializableFunction<UUID, Integer> transform =
bucket.bind(Types.UUIDType.get());
+
+
Assertions.assertThat(transform.apply(UUID.fromString("fd02441d-1423-4a3f-8785-c7dd5647e26b")))
+
.isLessThan(transform.apply(UUID.fromString("86873e7d-1374-4493-8e1d-9095eff7046c")));
+
Assertions.assertThat(transform.apply(UUID.fromString("fd02441d-1423-4a3f-8785-c7dd5647e26b")))
+
.isEqualTo(transform.apply(UUID.fromString("81873e7d-1374-4493-8e1d-9095eff7046c")));
+
+ TestHelpers.Row less =
+ TestHelpers.Row.of("id3",
UUID.fromString("fd02441d-1423-4a3f-8785-c7dd5647e26b"));
+ TestHelpers.Row greater =
+ TestHelpers.Row.of("id2",
UUID.fromString("86873e7d-1374-4493-8e1d-9095eff7046c"));
+ // bucket value equals to the less object
+ TestHelpers.Row lessCopy =
+ TestHelpers.Row.of("id1",
UUID.fromString("81873e7d-1374-4493-8e1d-9095eff7046c"));
+
+ SortOrder sortOrder =
+ SortOrder.builderFor(schema)
+ .sortBy(Expressions.bucket("field", 4), SortDirection.ASC,
NullOrder.NULLS_FIRST)
+ .build();
+ assertComparesCorrectly(
+ schema, sortOrder, less, greater, lessCopy,
SINGLE_SORT_FILED_NULL_VALUE);
+ }
+
+ @Test
+ public void testFixed() {
+ testIdentitySortField(
+ singleSortFildSchema(Types.FixedType.ofLength(3)),
+ TestHelpers.Row.of("id3", ByteBuffer.wrap(new byte[] {1, 2, 3})),
+ TestHelpers.Row.of("id2", ByteBuffer.wrap(new byte[] {3, 2, 1})),
+ TestHelpers.Row.of("id1", ByteBuffer.wrap(new byte[] {1, 2, 3})));
+ }
+
+ @Test
+ public void testBinary() {
+ testIdentitySortField(
+ singleSortFildSchema(Types.BinaryType.get()),
+ TestHelpers.Row.of("id3", ByteBuffer.wrap(new byte[] {1, 1})),
+ TestHelpers.Row.of("id2", ByteBuffer.wrap(new byte[] {1, 1, 1})),
+ TestHelpers.Row.of("id1", ByteBuffer.wrap(new byte[] {1, 1})));
+ }
+
+ @Test
+ public void testBinaryTruncate() {
+ Schema schema = singleSortFildSchema(Types.BinaryType.get());
+ Transform<ByteBuffer, ByteBuffer> truncate = Transforms.truncate(2);
+ SerializableFunction<ByteBuffer, ByteBuffer> transform =
truncate.bind(Types.BinaryType.get());
+
+ Assertions.assertThat(transform.apply(ByteBuffer.wrap(new byte[] {1, 2,
3})))
+ .isLessThan(transform.apply(ByteBuffer.wrap(new byte[] {1, 3, 1})));
+ Assertions.assertThat(transform.apply(ByteBuffer.wrap(new byte[] {1, 2,
3})))
+ .isEqualTo(transform.apply(ByteBuffer.wrap(new byte[] {1, 2, 5, 6})));
+
+ TestHelpers.Row less = TestHelpers.Row.of("id3", ByteBuffer.wrap(new
byte[] {1, 2, 3}));
+ TestHelpers.Row greater = TestHelpers.Row.of("id2", ByteBuffer.wrap(new
byte[] {1, 3, 1}));
+ // bucket value equals to the less object
+ TestHelpers.Row lessCopy = TestHelpers.Row.of("id1", ByteBuffer.wrap(new
byte[] {1, 2, 5, 6}));
+
+ SortOrder sortOrder =
+ SortOrder.builderFor(schema)
+ .sortBy(Expressions.truncate("field", 2), SortDirection.ASC,
NullOrder.NULLS_FIRST)
+ .build();
+ assertComparesCorrectly(
+ schema, sortOrder, less, greater, lessCopy,
SINGLE_SORT_FILED_NULL_VALUE);
+ }
+
+ @Test
+ public void testDecimal() {
+ testIdentitySortField(
+ singleSortFildSchema(Types.DecimalType.of(9, 5)),
+ TestHelpers.Row.of("id3", BigDecimal.valueOf(0.1)),
+ TestHelpers.Row.of("id2", BigDecimal.valueOf(0.2)),
+ TestHelpers.Row.of("id1", BigDecimal.valueOf(0.1)));
+ }
+
+ @Test
+ public void testStruct() {
+ Schema schema =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.StringType.get()),
+ Types.NestedField.optional(
+ 2,
+ "location",
+ Types.StructType.of(
+ Types.NestedField.required(11, "lat",
Types.FloatType.get()),
+ Types.NestedField.required(12, "long",
Types.FloatType.get()))));
+
+ SortOrder latOnlyAsc =
SortOrder.builderFor(schema).asc("location.lat").build();
+ TestHelpers.Row lessLat = TestHelpers.Row.of("id4",
TestHelpers.Row.of(1.0f, 1.0f));
+ TestHelpers.Row greaterLat = TestHelpers.Row.of("id3",
TestHelpers.Row.of(2.0f, 1.0f));
+ TestHelpers.Row lessLatCopy = TestHelpers.Row.of("id2",
TestHelpers.Row.of(1.0f, 1.0f));
+ TestHelpers.Row nullLat = TestHelpers.Row.of("id1",
TestHelpers.Row.of(null, 1.0f));
+ assertComparesCorrectly(schema, latOnlyAsc, lessLat, greaterLat,
lessLatCopy, nullLat);
+
+ SortOrder longOnlyDesc =
+ SortOrder.builderFor(schema).desc("location.long",
NullOrder.NULLS_LAST).build();
+ TestHelpers.Row lessLong = TestHelpers.Row.of("id4",
TestHelpers.Row.of(1.0f, 1.0f));
+ TestHelpers.Row greaterLong = TestHelpers.Row.of("id3",
TestHelpers.Row.of(1.0f, 2.0f));
+ TestHelpers.Row lessLongCopy = TestHelpers.Row.of("id2",
TestHelpers.Row.of(1.0f, 1.0f));
+ TestHelpers.Row nullLong = TestHelpers.Row.of("id1",
TestHelpers.Row.of(1.0f, null));
+ assertComparesCorrectly(schema, longOnlyDesc, lessLong, greaterLong,
lessLongCopy, nullLong);
+
+ SortOrder twoFieldsSortOrder =
+
SortOrder.builderFor(schema).asc("location.lat").asc("location.long").build();
+ TestHelpers.Row lessTwoSortFields = TestHelpers.Row.of("id4",
TestHelpers.Row.of(1.0f, 1.0f));
+ TestHelpers.Row greaterLatTwoSortFields =
+ TestHelpers.Row.of("id3", TestHelpers.Row.of(2.0f, 1.0f));
+ TestHelpers.Row greaterLongTwoSortFields =
+ TestHelpers.Row.of("id3", TestHelpers.Row.of(1.0f, 2.0f));
+ TestHelpers.Row greaterBothTwoSortFields =
+ TestHelpers.Row.of("id3", TestHelpers.Row.of(2.0f, 2.0f));
+ TestHelpers.Row lessTwoSortFieldsCopy =
+ TestHelpers.Row.of("id2", TestHelpers.Row.of(1.0f, 1.0f));
+ TestHelpers.Row nullLatTwoSortFields =
+ TestHelpers.Row.of("id1", TestHelpers.Row.of(null, 1.0f));
+ TestHelpers.Row nullLongTwoSortFields =
+ TestHelpers.Row.of("id1", TestHelpers.Row.of(1.0f, null));
+ TestHelpers.Row nullBothTowSortFields =
+ TestHelpers.Row.of("id1", TestHelpers.Row.of(null, null));
+ assertComparesCorrectly(
+ schema,
+ twoFieldsSortOrder,
+ lessTwoSortFields,
+ greaterLatTwoSortFields,
+ lessTwoSortFieldsCopy,
+ nullLatTwoSortFields);
+ assertComparesCorrectly(
+ schema,
+ twoFieldsSortOrder,
+ lessTwoSortFields,
+ greaterLongTwoSortFields,
+ lessTwoSortFieldsCopy,
+ nullLongTwoSortFields);
+ assertComparesCorrectly(
+ schema,
+ twoFieldsSortOrder,
+ lessTwoSortFields,
+ greaterBothTwoSortFields,
+ lessTwoSortFieldsCopy,
+ nullBothTowSortFields);
+ }
+
+ @Test
+ public void testStructTransform() {
+ Schema schema =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.StringType.get()),
+ Types.NestedField.optional(
+ 2,
+ "struct",
+ Types.StructType.of(
+ Types.NestedField.required(11, "left",
Types.BinaryType.get()),
+ Types.NestedField.required(12, "right",
Types.BinaryType.get()))));
+
+ Transform<ByteBuffer, ByteBuffer> bucket = Transforms.truncate(2);
+ SerializableFunction<ByteBuffer, ByteBuffer> transform =
bucket.bind(Types.BinaryType.get());
+
+ Assertions.assertThat(transform.apply(ByteBuffer.wrap(new byte[] {2, 3,
4})))
+ .isLessThan(transform.apply(ByteBuffer.wrap(new byte[] {9, 3, 4})));
+ Assertions.assertThat(transform.apply(ByteBuffer.wrap(new byte[] {2, 3,
4})))
+ .isEqualTo(transform.apply(ByteBuffer.wrap(new byte[] {2, 3, 9})));
+
+ TestHelpers.Row less =
+ TestHelpers.Row.of(
+ "id4",
+ TestHelpers.Row.of(
+ ByteBuffer.wrap(new byte[] {1}), ByteBuffer.wrap(new byte[]
{2, 3, 4})));
+ TestHelpers.Row greater =
+ TestHelpers.Row.of(
+ "id3",
+ TestHelpers.Row.of(
+ ByteBuffer.wrap(new byte[] {1}), ByteBuffer.wrap(new byte[]
{9, 3, 4})));
+ TestHelpers.Row lessCopy =
+ TestHelpers.Row.of(
+ "id2",
+ TestHelpers.Row.of(
+ ByteBuffer.wrap(new byte[] {1}), ByteBuffer.wrap(new byte[]
{2, 3, 9})));
+ TestHelpers.Row nullRight =
+ TestHelpers.Row.of("id1", TestHelpers.Row.of(ByteBuffer.wrap(new
byte[] {1}), null));
+
+ SortOrder sortOrder =
+ SortOrder.builderFor(schema)
+ .sortBy(
+ Expressions.truncate("struct.left", 2), SortDirection.ASC,
NullOrder.NULLS_FIRST)
+ .sortBy(
+ Expressions.truncate("struct.right", 2), SortDirection.ASC,
NullOrder.NULLS_FIRST)
+ .build();
+ assertComparesCorrectly(schema, sortOrder, less, greater, lessCopy,
nullRight);
+ }
+
+ @Test
+ public void testNestedStruct() {
+ Schema schema =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.StringType.get()),
+ Types.NestedField.optional(
+ 2,
+ "user",
+ Types.StructType.of(
+ Types.NestedField.required(11, "name",
Types.StringType.get()),
+ Types.NestedField.optional(
+ 12,
+ "location",
+ Types.StructType.of(
+ Types.NestedField.required(101, "lat",
Types.FloatType.get()),
+ Types.NestedField.required(102, "long",
Types.FloatType.get()))))));
+
+ SortOrder build =
+
SortOrder.builderFor(schema).asc("user.location.lat").asc("user.location.long").build();
+ TestHelpers.Row less =
+ TestHelpers.Row.of("id4", TestHelpers.Row.of("user4",
TestHelpers.Row.of(1.0f, 1.0f)));
+ TestHelpers.Row greaterLat =
+ TestHelpers.Row.of("id3", TestHelpers.Row.of("user3",
TestHelpers.Row.of(2.0f, 1.0f)));
+ TestHelpers.Row greaterLong =
+ TestHelpers.Row.of("id3", TestHelpers.Row.of("user3",
TestHelpers.Row.of(1.0f, 2.0f)));
+ TestHelpers.Row greaterBoth =
+ TestHelpers.Row.of("id3", TestHelpers.Row.of("user3",
TestHelpers.Row.of(2.0f, 2.0f)));
+ TestHelpers.Row lessCopy =
+ TestHelpers.Row.of("id2", TestHelpers.Row.of("user2",
TestHelpers.Row.of(1.0f, 1.0f)));
+ TestHelpers.Row nullLat =
+ TestHelpers.Row.of("id1", TestHelpers.Row.of("user1",
TestHelpers.Row.of(null, 1.0f)));
+ TestHelpers.Row nullLong =
+ TestHelpers.Row.of("id1", TestHelpers.Row.of("user1",
TestHelpers.Row.of(1.0f, null)));
+ TestHelpers.Row nullBoth =
+ TestHelpers.Row.of("id1", TestHelpers.Row.of("user1",
TestHelpers.Row.of(null, null)));
+ assertComparesCorrectly(schema, build, less, greaterLat, lessCopy,
nullLat);
+ assertComparesCorrectly(schema, build, less, greaterLong, lessCopy,
nullLong);
+ assertComparesCorrectly(schema, build, less, greaterBoth, lessCopy,
nullBoth);
+ }
+}