This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new b21a8ce249 API: add StructTransform base class for PartitionKey and 
SortKey. add SortOrderComparators (#7798)
b21a8ce249 is described below

commit b21a8ce2497eb560feb9ed771a1e28ca57d1cf60
Author: Steven Zhen Wu <[email protected]>
AuthorDate: Tue Nov 28 10:41:38 2023 -0800

    API: add StructTransform base class for PartitionKey and SortKey. add 
SortOrderComparators (#7798)
---
 .../main/java/org/apache/iceberg/PartitionKey.java |  97 +---
 api/src/main/java/org/apache/iceberg/SortKey.java  |  56 +++
 .../org/apache/iceberg/SortOrderComparators.java   | 105 +++++
 .../java/org/apache/iceberg/StructTransform.java   | 149 +++++++
 .../apache/iceberg/TestSortOrderComparators.java   | 489 +++++++++++++++++++++
 5 files changed, 814 insertions(+), 82 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/PartitionKey.java 
b/api/src/main/java/org/apache/iceberg/PartitionKey.java
index fc56d1a453..08e3ea319b 100644
--- a/api/src/main/java/org/apache/iceberg/PartitionKey.java
+++ b/api/src/main/java/org/apache/iceberg/PartitionKey.java
@@ -18,13 +18,8 @@
  */
 package org.apache.iceberg;
 
-import java.io.Serializable;
-import java.lang.reflect.Array;
-import java.util.Arrays;
 import java.util.List;
-import java.util.function.Function;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.util.SerializableFunction;
+import java.util.stream.Collectors;
 
 /**
  * A struct of partition values.
@@ -32,58 +27,23 @@ import org.apache.iceberg.util.SerializableFunction;
  * <p>Instances of this class can produce partition values from a data row 
passed to {@link
  * #partition(StructLike)}.
  */
-public class PartitionKey implements StructLike, Serializable {
+public class PartitionKey extends StructTransform {
 
   private final PartitionSpec spec;
-  private final int size;
-  private final Object[] partitionTuple;
-  private final SerializableFunction[] transforms;
-  private final Accessor<StructLike>[] accessors;
+  private final Schema inputSchema;
 
   @SuppressWarnings("unchecked")
   public PartitionKey(PartitionSpec spec, Schema inputSchema) {
+    super(inputSchema, fieldTransform(spec));
     this.spec = spec;
-
-    List<PartitionField> fields = spec.fields();
-    this.size = fields.size();
-    this.partitionTuple = new Object[size];
-    this.transforms = new SerializableFunction[size];
-    this.accessors = (Accessor<StructLike>[]) 
Array.newInstance(Accessor.class, size);
-
-    Schema schema = spec.schema();
-    for (int i = 0; i < size; i += 1) {
-      PartitionField field = fields.get(i);
-      Accessor<StructLike> accessor = 
inputSchema.accessorForField(field.sourceId());
-      Preconditions.checkArgument(
-          accessor != null,
-          "Cannot build accessor for field: " + 
schema.findField(field.sourceId()));
-      this.accessors[i] = accessor;
-      this.transforms[i] = field.transform().bind(accessor.type());
-    }
+    this.inputSchema = inputSchema;
   }
 
   private PartitionKey(PartitionKey toCopy) {
+    // only need deep copy inside StructTransform
+    super(toCopy);
     this.spec = toCopy.spec;
-    this.size = toCopy.size;
-    this.partitionTuple = new Object[toCopy.partitionTuple.length];
-    this.transforms = toCopy.transforms;
-    this.accessors = toCopy.accessors;
-
-    System.arraycopy(toCopy.partitionTuple, 0, this.partitionTuple, 0, 
partitionTuple.length);
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append("[");
-    for (int i = 0; i < partitionTuple.length; i += 1) {
-      if (i > 0) {
-        sb.append(", ");
-      }
-      sb.append(partitionTuple[i]);
-    }
-    sb.append("]");
-    return sb.toString();
+    this.inputSchema = toCopy.inputSchema;
   }
 
   public PartitionKey copy() {
@@ -101,41 +61,14 @@ public class PartitionKey implements StructLike, 
Serializable {
    */
   @SuppressWarnings("unchecked")
   public void partition(StructLike row) {
-    for (int i = 0; i < partitionTuple.length; i += 1) {
-      Function<Object, Object> transform = transforms[i];
-      partitionTuple[i] = transform.apply(accessors[i].get(row));
-    }
-  }
-
-  @Override
-  public int size() {
-    return size;
-  }
-
-  @Override
-  public <T> T get(int pos, Class<T> javaClass) {
-    return javaClass.cast(partitionTuple[pos]);
-  }
-
-  @Override
-  public <T> void set(int pos, T value) {
-    partitionTuple[pos] = value;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    } else if (!(o instanceof PartitionKey)) {
-      return false;
-    }
-
-    PartitionKey that = (PartitionKey) o;
-    return Arrays.equals(partitionTuple, that.partitionTuple);
+    wrap(row);
   }
 
-  @Override
-  public int hashCode() {
-    return Arrays.hashCode(partitionTuple);
+  private static List<FieldTransform> fieldTransform(PartitionSpec spec) {
+    return spec.fields().stream()
+        .map(
+            partitionField ->
+                new FieldTransform(partitionField.sourceId(), 
partitionField.transform()))
+        .collect(Collectors.toList());
   }
 }
diff --git a/api/src/main/java/org/apache/iceberg/SortKey.java 
b/api/src/main/java/org/apache/iceberg/SortKey.java
new file mode 100644
index 0000000000..8a9ccf0bd7
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/SortKey.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * A struct of flattened sort field values.
+ *
+ * <p>Instances of this class can produce sort values from a row passed to 
{@link
+ * #wrap(StructLike)}.
+ */
+public class SortKey extends StructTransform {
+  private final Schema schema;
+  private final SortOrder sortOrder;
+
+  public SortKey(Schema schema, SortOrder sortOrder) {
+    super(schema, fieldTransform(sortOrder));
+    this.schema = schema;
+    this.sortOrder = sortOrder;
+  }
+
+  private SortKey(SortKey toCopy) {
+    // only need deep copy inside StructTransform
+    super(toCopy);
+    this.schema = toCopy.schema;
+    this.sortOrder = toCopy.sortOrder;
+  }
+
+  public SortKey copy() {
+    return new SortKey(this);
+  }
+
+  private static List<FieldTransform> fieldTransform(SortOrder sortOrder) {
+    return sortOrder.fields().stream()
+        .map(sortField -> new FieldTransform(sortField.sourceId(), 
sortField.transform()))
+        .collect(Collectors.toList());
+  }
+}
diff --git a/api/src/main/java/org/apache/iceberg/SortOrderComparators.java 
b/api/src/main/java/org/apache/iceberg/SortOrderComparators.java
new file mode 100644
index 0000000000..c730127c54
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/SortOrderComparators.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.lang.reflect.Array;
+import java.util.Comparator;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+public class SortOrderComparators {
+  private SortOrderComparators() {}
+
+  /** Compare structs with the specified sort order projection */
+  public static Comparator<StructLike> forSchema(Schema schema, SortOrder 
sortOrder) {
+    Preconditions.checkArgument(sortOrder.isSorted(), "Invalid sort order: 
unsorted");
+    SortOrder.checkCompatibility(sortOrder, schema);
+    return new SortOrderComparator(schema, sortOrder);
+  }
+
+  /** Util method to chain sort direction and null order to the original 
comparator. */
+  private static Comparator<Object> sortFieldComparator(
+      Comparator<Object> original, SortField sortField) {
+    Comparator<Object> comparator = original;
+    if (sortField == null) {
+      return Comparators.nullsFirst().thenComparing(comparator);
+    }
+
+    if (sortField.direction() == SortDirection.DESC) {
+      comparator = comparator.reversed();
+    }
+
+    if (sortField.nullOrder() == NullOrder.NULLS_FIRST) {
+      comparator = Comparators.nullsFirst().thenComparing(comparator);
+    } else if (sortField.nullOrder() == NullOrder.NULLS_LAST) {
+      comparator = Comparators.nullsLast().thenComparing(comparator);
+    }
+
+    return comparator;
+  }
+
+  private static class SortOrderComparator implements Comparator<StructLike> {
+    private final SortKey leftKey;
+    private final SortKey rightKey;
+    private final int size;
+    private final Comparator<Object>[] comparators;
+    private final Type[] transformResultTypes;
+
+    private SortOrderComparator(Schema schema, SortOrder sortOrder) {
+      this.leftKey = new SortKey(schema, sortOrder);
+      this.rightKey = new SortKey(schema, sortOrder);
+      this.size = sortOrder.fields().size();
+      this.comparators = (Comparator<Object>[]) 
Array.newInstance(Comparator.class, size);
+      this.transformResultTypes = (Type[]) Array.newInstance(Type.class, size);
+
+      for (int i = 0; i < size; ++i) {
+        SortField sortField = sortOrder.fields().get(i);
+        Types.NestedField field = schema.findField(sortField.sourceId());
+        Type transformResultType = 
sortField.transform().getResultType(field.type());
+        Preconditions.checkArgument(
+            transformResultType.isPrimitiveType(), "Invalid transform result 
type: non-primitive");
+        transformResultTypes[i] = transformResultType;
+        Comparator<Object> comparator = 
Comparators.forType(transformResultType.asPrimitiveType());
+        comparators[i] = sortFieldComparator(comparator, sortField);
+      }
+    }
+
+    @Override
+    public int compare(StructLike left, StructLike right) {
+      if (left == right) {
+        return 0;
+      }
+
+      leftKey.wrap(left);
+      rightKey.wrap(right);
+
+      for (int i = 0; i < size; i += 1) {
+        Class<?> valueClass = transformResultTypes[i].typeId().javaClass();
+        int cmp = comparators[i].compare(leftKey.get(i, valueClass), 
rightKey.get(i, valueClass));
+        if (cmp != 0) {
+          return cmp;
+        }
+      }
+
+      return 0;
+    }
+  }
+}
diff --git a/api/src/main/java/org/apache/iceberg/StructTransform.java 
b/api/src/main/java/org/apache/iceberg/StructTransform.java
new file mode 100644
index 0000000000..dcb27488a5
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/StructTransform.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.util.SerializableFunction;
+
+/**
+ * A struct of flattened transformed values.
+ *
+ * <p>Instances of this class can produce transformed values from a row passed 
to {@link
+ * #wrap(StructLike)}.
+ */
+class StructTransform implements StructLike, Serializable {
+
+  private final int size;
+  private final Accessor<StructLike>[] accessors;
+
+  @SuppressWarnings("rawtypes")
+  private final SerializableFunction[] transforms;
+
+  private final Object[] transformedTuple;
+
+  StructTransform(Schema schema, List<FieldTransform> fieldTransforms) {
+    Preconditions.checkArgument(fieldTransforms != null, "Invalid field 
transform list: null");
+
+    this.size = fieldTransforms.size();
+    this.accessors = (Accessor<StructLike>[]) 
Array.newInstance(Accessor.class, size);
+    this.transforms = new SerializableFunction[size];
+
+    for (int i = 0; i < size; ++i) {
+      int sourceFieldId = fieldTransforms.get(i).sourceFieldId();
+      Transform<?, ?> transform = fieldTransforms.get(i).transform();
+      Accessor<StructLike> accessor = schema.accessorForField(sourceFieldId);
+      Preconditions.checkArgument(
+          accessor != null, "Cannot build accessor for field: %s", 
schema.findField(sourceFieldId));
+      this.accessors[i] = accessor;
+      this.transforms[i] = transform.bind(accessor.type());
+    }
+
+    this.transformedTuple = new Object[size];
+  }
+
+  StructTransform(StructTransform toCopy) {
+    this.size = toCopy.size;
+    this.accessors = toCopy.accessors;
+    this.transforms = toCopy.transforms;
+
+    this.transformedTuple = new Object[size];
+    System.arraycopy(toCopy.transformedTuple, 0, this.transformedTuple, 0, 
size);
+  }
+
+  public void wrap(StructLike row) {
+    for (int i = 0; i < transformedTuple.length; i += 1) {
+      Function<Object, Object> transform = transforms[i];
+      transformedTuple[i] = transform.apply(accessors[i].get(row));
+    }
+  }
+
+  @Override
+  public int size() {
+    return size;
+  }
+
+  @Override
+  public <T> T get(int pos, Class<T> javaClass) {
+    return javaClass.cast(transformedTuple[pos]);
+  }
+
+  @Override
+  public <T> void set(int pos, T value) {
+    transformedTuple[pos] = value;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("[");
+    for (int i = 0; i < transformedTuple.length; i += 1) {
+      if (i > 0) {
+        sb.append(", ");
+      }
+      sb.append(transformedTuple[i]);
+    }
+    sb.append("]");
+    return sb.toString();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    } else if (!(o instanceof StructTransform)) {
+      return false;
+    }
+
+    StructTransform that = (StructTransform) o;
+    return Arrays.equals(transformedTuple, that.transformedTuple);
+  }
+
+  @Override
+  public int hashCode() {
+    return Arrays.hashCode(transformedTuple);
+  }
+
+  /**
+   * Simple POJO for source field id and transform function. {@code Pair} 
class is not usable here
+   * in API module, as it has an Avro dep and is in the core module.
+   */
+  static class FieldTransform {
+    private final int sourceFieldId;
+    private final Transform<?, ?> transform;
+
+    FieldTransform(int sourceFieldId, Transform<?, ?> transform) {
+      this.sourceFieldId = sourceFieldId;
+      this.transform = transform;
+    }
+
+    int sourceFieldId() {
+      return sourceFieldId;
+    }
+
+    Transform<?, ?> transform() {
+      return transform;
+    }
+  }
+}
diff --git a/api/src/test/java/org/apache/iceberg/TestSortOrderComparators.java 
b/api/src/test/java/org/apache/iceberg/TestSortOrderComparators.java
new file mode 100644
index 0000000000..90ac3e09b6
--- /dev/null
+++ b/api/src/test/java/org/apache/iceberg/TestSortOrderComparators.java
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Comparator;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SerializableFunction;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestSortOrderComparators {
+
+  private static void assertComparesCorrectly(
+      Schema schema,
+      SortOrder sortOrder,
+      StructLike less,
+      StructLike greater,
+      StructLike lessCopy,
+      StructLike nullValue) {
+
+    Comparator<StructLike> comparator = SortOrderComparators.forSchema(schema, 
sortOrder);
+    // all fields should have the same sort direction in this test class
+    
Assertions.assertThat(sortOrder.fields().stream().map(SortField::direction).distinct())
+        .hasSize(1);
+    SortDirection direction = sortOrder.fields().get(0).direction();
+
+    Assertions.assertThat(comparator.compare(less, less)).isEqualTo(0);
+    Assertions.assertThat(comparator.compare(greater, greater)).isEqualTo(0);
+    Assertions.assertThat(comparator.compare(less, lessCopy)).isEqualTo(0);
+
+    if (direction == SortDirection.ASC) {
+      Assertions.assertThat(comparator.compare(less, greater)).isEqualTo(-1);
+      Assertions.assertThat(comparator.compare(greater, less)).isEqualTo(1);
+      // null first
+      Assertions.assertThat(comparator.compare(nullValue, less)).isEqualTo(-1);
+      Assertions.assertThat(comparator.compare(less, nullValue)).isEqualTo(1);
+    } else {
+      Assertions.assertThat(comparator.compare(less, greater)).isEqualTo(1);
+      Assertions.assertThat(comparator.compare(greater, less)).isEqualTo(-1);
+      // null last
+      Assertions.assertThat(comparator.compare(nullValue, 
greater)).isEqualTo(1);
+      Assertions.assertThat(comparator.compare(less, nullValue)).isEqualTo(-1);
+    }
+  }
+
+  private static Schema singleSortFildSchema(Type type) {
+    return new Schema(
+        Types.NestedField.optional(1, "id", Types.StringType.get()),
+        Types.NestedField.optional(2, "field", type));
+  }
+
+  private static final TestHelpers.Row SINGLE_SORT_FILED_NULL_VALUE =
+      TestHelpers.Row.of("id4", null);
+
+  private static SortOrder ascOrder(Schema schema) {
+    return SortOrder.builderFor(schema).asc("field").build();
+  }
+
+  private static SortOrder descOrder(Schema schema) {
+    return SortOrder.builderFor(schema).desc("field", 
NullOrder.NULLS_LAST).build();
+  }
+
+  private static void testIdentitySortField(
+      Schema schema, TestHelpers.Row less, TestHelpers.Row greater, 
TestHelpers.Row lessCopy) {
+    assertComparesCorrectly(
+        schema, ascOrder(schema), less, greater, lessCopy, 
SINGLE_SORT_FILED_NULL_VALUE);
+    assertComparesCorrectly(
+        schema, descOrder(schema), less, greater, lessCopy, 
SINGLE_SORT_FILED_NULL_VALUE);
+  }
+
+  @Test
+  public void testBoolean() {
+    testIdentitySortField(
+        singleSortFildSchema(Types.BooleanType.get()),
+        TestHelpers.Row.of("id3", Boolean.FALSE),
+        TestHelpers.Row.of("id2", Boolean.TRUE),
+        TestHelpers.Row.of("id1", Boolean.FALSE));
+  }
+
+  @Test
+  public void testInt() {
+    testIdentitySortField(
+        singleSortFildSchema(Types.IntegerType.get()),
+        TestHelpers.Row.of("id3", 111),
+        TestHelpers.Row.of("id2", 222),
+        TestHelpers.Row.of("id1", 111));
+  }
+
+  @Test
+  public void testLong() {
+    testIdentitySortField(
+        singleSortFildSchema(Types.LongType.get()),
+        TestHelpers.Row.of("id3", 111L),
+        TestHelpers.Row.of("id2", 222L),
+        TestHelpers.Row.of("id1", 111L));
+  }
+
+  @Test
+  public void testFloat() {
+    testIdentitySortField(
+        singleSortFildSchema(Types.FloatType.get()),
+        TestHelpers.Row.of("id3", 1.11f),
+        TestHelpers.Row.of("id1", 2.22f),
+        TestHelpers.Row.of("id1", 1.11f));
+  }
+
+  @Test
+  public void testDouble() {
+    testIdentitySortField(
+        singleSortFildSchema(Types.DoubleType.get()),
+        TestHelpers.Row.of("id3", 1.11d),
+        TestHelpers.Row.of("id2", 2.22d),
+        TestHelpers.Row.of("id1", 1.11d));
+  }
+
+  @Test
+  public void testDate() {
+    testIdentitySortField(
+        singleSortFildSchema(Types.DateType.get()),
+        TestHelpers.Row.of("id3", 111),
+        TestHelpers.Row.of("id2", 222),
+        TestHelpers.Row.of("id1", 111));
+  }
+
+  @Test
+  public void testTime() {
+    testIdentitySortField(
+        singleSortFildSchema(Types.TimeType.get()),
+        TestHelpers.Row.of("id3", 111L),
+        TestHelpers.Row.of("id2", 222L),
+        TestHelpers.Row.of("id1", 111L));
+  }
+
+  @Test
+  public void testTimestamp() {
+    Schema schemaWithZone = 
singleSortFildSchema(Types.TimestampType.withZone());
+    Schema schemaWithoutZone = 
singleSortFildSchema(Types.TimestampType.withoutZone());
+
+    long lessMicro =
+        TimeUnit.SECONDS.toMicros(
+            LocalDateTime.of(2022, 1, 10, 0, 0, 
0).toEpochSecond(ZoneOffset.UTC));
+    long greaterMicro =
+        TimeUnit.SECONDS.toMicros(
+            LocalDateTime.of(2022, 1, 10, 1, 0, 
0).toEpochSecond(ZoneOffset.UTC));
+    long lessCopyMicro =
+        TimeUnit.SECONDS.toMicros(
+            LocalDateTime.of(2022, 1, 10, 0, 0, 
0).toEpochSecond(ZoneOffset.UTC));
+
+    TestHelpers.Row less = TestHelpers.Row.of("id3", lessMicro);
+    TestHelpers.Row greater = TestHelpers.Row.of("id2", greaterMicro);
+    TestHelpers.Row lessCopy = TestHelpers.Row.of("id1", lessCopyMicro);
+    testIdentitySortField(schemaWithZone, less, greater, lessCopy);
+    testIdentitySortField(schemaWithoutZone, less, greater, lessCopy);
+  }
+
+  @Test
+  public void testTimestampTransform() {
+    Schema schema = singleSortFildSchema(Types.TimestampType.withZone());
+    Transform<Long, Integer> bucket = Transforms.day();
+    SerializableFunction<Long, Integer> transform = 
bucket.bind(Types.TimestampType.withZone());
+
+    long lessMicro =
+        TimeUnit.SECONDS.toMicros(
+            LocalDateTime.of(2022, 1, 10, 0, 0, 
0).toEpochSecond(ZoneOffset.UTC));
+    long greaterMicro =
+        TimeUnit.SECONDS.toMicros(
+            LocalDateTime.of(2022, 1, 11, 0, 0, 
0).toEpochSecond(ZoneOffset.UTC));
+    // intentionally changed hour value from 0 to 1. days should equal
+    long lessCopyMicro =
+        TimeUnit.SECONDS.toMicros(
+            LocalDateTime.of(2022, 1, 10, 1, 0, 
0).toEpochSecond(ZoneOffset.UTC));
+
+    
Assertions.assertThat(transform.apply(lessMicro)).isLessThan(transform.apply(greaterMicro));
+    
Assertions.assertThat(transform.apply(lessMicro)).isEqualTo(transform.apply(lessCopyMicro));
+
+    TestHelpers.Row less = TestHelpers.Row.of("id3", lessMicro);
+    TestHelpers.Row greater = TestHelpers.Row.of("id2", greaterMicro);
+    TestHelpers.Row lessCopy = TestHelpers.Row.of("id1", lessCopyMicro);
+
+    SortOrder sortOrder =
+        SortOrder.builderFor(schema)
+            .sortBy(Expressions.day("field"), SortDirection.ASC, 
NullOrder.NULLS_FIRST)
+            .build();
+    assertComparesCorrectly(
+        schema, sortOrder, less, greater, lessCopy, 
SINGLE_SORT_FILED_NULL_VALUE);
+  }
+
+  @Test
+  public void testString() {
+    testIdentitySortField(
+        singleSortFildSchema(Types.StringType.get()),
+        TestHelpers.Row.of("id3", "aaa"),
+        TestHelpers.Row.of("id2", "bbb"),
+        TestHelpers.Row.of("id1", "aaa"));
+  }
+
+  @Test
+  public void testStringBucket() {
+    Schema schema = singleSortFildSchema(Types.StringType.get());
+    Transform<String, Integer> bucket = Transforms.bucket(4);
+    SerializableFunction<String, Integer> transform = 
bucket.bind(Types.StringType.get());
+
+    
Assertions.assertThat(transform.apply("bbb")).isLessThan(transform.apply("aaa"));
+    
Assertions.assertThat(transform.apply("bbb")).isEqualTo(transform.apply("cca"));
+
+    TestHelpers.Row less = TestHelpers.Row.of("id3", "bbb");
+    TestHelpers.Row greater = TestHelpers.Row.of("id2", "aaa");
+    // bucket("cca", 4) equals to bucket("bbb", 4)
+    TestHelpers.Row lessCopy = TestHelpers.Row.of("id1", "cca");
+
+    SortOrder sortOrder =
+        SortOrder.builderFor(schema)
+            .sortBy(Expressions.bucket("field", 4), SortDirection.ASC, 
NullOrder.NULLS_FIRST)
+            .build();
+    assertComparesCorrectly(
+        schema, sortOrder, less, greater, lessCopy, 
SINGLE_SORT_FILED_NULL_VALUE);
+  }
+
+  @Test
+  public void testUuid() {
+    Schema schema = singleSortFildSchema(Types.UUIDType.get());
+    TestHelpers.Row less =
+        TestHelpers.Row.of("id3", 
UUID.fromString("81873e7d-1374-4493-8e1d-9095eff7046c"));
+    TestHelpers.Row greater =
+        TestHelpers.Row.of("id2", 
UUID.fromString("fd02441d-1423-4a3f-8785-c7dd5647e26b"));
+    TestHelpers.Row lessCopy =
+        TestHelpers.Row.of("id1", 
UUID.fromString("81873e7d-1374-4493-8e1d-9095eff7046c"));
+    testIdentitySortField(schema, less, greater, lessCopy);
+  }
+
+  @Test
+  public void testUUIDBucket() {
+    Schema schema = singleSortFildSchema(Types.UUIDType.get());
+    Transform<UUID, Integer> bucket = Transforms.bucket(4);
+    SerializableFunction<UUID, Integer> transform = 
bucket.bind(Types.UUIDType.get());
+
+    
Assertions.assertThat(transform.apply(UUID.fromString("fd02441d-1423-4a3f-8785-c7dd5647e26b")))
+        
.isLessThan(transform.apply(UUID.fromString("86873e7d-1374-4493-8e1d-9095eff7046c")));
+    
Assertions.assertThat(transform.apply(UUID.fromString("fd02441d-1423-4a3f-8785-c7dd5647e26b")))
+        
.isEqualTo(transform.apply(UUID.fromString("81873e7d-1374-4493-8e1d-9095eff7046c")));
+
+    TestHelpers.Row less =
+        TestHelpers.Row.of("id3", 
UUID.fromString("fd02441d-1423-4a3f-8785-c7dd5647e26b"));
+    TestHelpers.Row greater =
+        TestHelpers.Row.of("id2", 
UUID.fromString("86873e7d-1374-4493-8e1d-9095eff7046c"));
+    // bucket value equals to the less object
+    TestHelpers.Row lessCopy =
+        TestHelpers.Row.of("id1", 
UUID.fromString("81873e7d-1374-4493-8e1d-9095eff7046c"));
+
+    SortOrder sortOrder =
+        SortOrder.builderFor(schema)
+            .sortBy(Expressions.bucket("field", 4), SortDirection.ASC, 
NullOrder.NULLS_FIRST)
+            .build();
+    assertComparesCorrectly(
+        schema, sortOrder, less, greater, lessCopy, 
SINGLE_SORT_FILED_NULL_VALUE);
+  }
+
+  @Test
+  public void testFixed() {
+    testIdentitySortField(
+        singleSortFildSchema(Types.FixedType.ofLength(3)),
+        TestHelpers.Row.of("id3", ByteBuffer.wrap(new byte[] {1, 2, 3})),
+        TestHelpers.Row.of("id2", ByteBuffer.wrap(new byte[] {3, 2, 1})),
+        TestHelpers.Row.of("id1", ByteBuffer.wrap(new byte[] {1, 2, 3})));
+  }
+
+  @Test
+  public void testBinary() {
+    testIdentitySortField(
+        singleSortFildSchema(Types.BinaryType.get()),
+        TestHelpers.Row.of("id3", ByteBuffer.wrap(new byte[] {1, 1})),
+        TestHelpers.Row.of("id2", ByteBuffer.wrap(new byte[] {1, 1, 1})),
+        TestHelpers.Row.of("id1", ByteBuffer.wrap(new byte[] {1, 1})));
+  }
+
+  @Test
+  public void testBinaryTruncate() {
+    Schema schema = singleSortFildSchema(Types.BinaryType.get());
+    Transform<ByteBuffer, ByteBuffer> truncate = Transforms.truncate(2);
+    SerializableFunction<ByteBuffer, ByteBuffer> transform = 
truncate.bind(Types.BinaryType.get());
+
+    Assertions.assertThat(transform.apply(ByteBuffer.wrap(new byte[] {1, 2, 
3})))
+        .isLessThan(transform.apply(ByteBuffer.wrap(new byte[] {1, 3, 1})));
+    Assertions.assertThat(transform.apply(ByteBuffer.wrap(new byte[] {1, 2, 
3})))
+        .isEqualTo(transform.apply(ByteBuffer.wrap(new byte[] {1, 2, 5, 6})));
+
+    TestHelpers.Row less = TestHelpers.Row.of("id3", ByteBuffer.wrap(new 
byte[] {1, 2, 3}));
+    TestHelpers.Row greater = TestHelpers.Row.of("id2", ByteBuffer.wrap(new 
byte[] {1, 3, 1}));
+    // bucket value equals to the less object
+    TestHelpers.Row lessCopy = TestHelpers.Row.of("id1", ByteBuffer.wrap(new 
byte[] {1, 2, 5, 6}));
+
+    SortOrder sortOrder =
+        SortOrder.builderFor(schema)
+            .sortBy(Expressions.truncate("field", 2), SortDirection.ASC, 
NullOrder.NULLS_FIRST)
+            .build();
+    assertComparesCorrectly(
+        schema, sortOrder, less, greater, lessCopy, 
SINGLE_SORT_FILED_NULL_VALUE);
+  }
+
+  @Test
+  public void testDecimal() {
+    testIdentitySortField(
+        singleSortFildSchema(Types.DecimalType.of(9, 5)),
+        TestHelpers.Row.of("id3", BigDecimal.valueOf(0.1)),
+        TestHelpers.Row.of("id2", BigDecimal.valueOf(0.2)),
+        TestHelpers.Row.of("id1", BigDecimal.valueOf(0.1)));
+  }
+
+  @Test
+  public void testStruct() {
+    Schema schema =
+        new Schema(
+            Types.NestedField.required(1, "id", Types.StringType.get()),
+            Types.NestedField.optional(
+                2,
+                "location",
+                Types.StructType.of(
+                    Types.NestedField.required(11, "lat", 
Types.FloatType.get()),
+                    Types.NestedField.required(12, "long", 
Types.FloatType.get()))));
+
+    SortOrder latOnlyAsc = 
SortOrder.builderFor(schema).asc("location.lat").build();
+    TestHelpers.Row lessLat = TestHelpers.Row.of("id4", 
TestHelpers.Row.of(1.0f, 1.0f));
+    TestHelpers.Row greaterLat = TestHelpers.Row.of("id3", 
TestHelpers.Row.of(2.0f, 1.0f));
+    TestHelpers.Row lessLatCopy = TestHelpers.Row.of("id2", 
TestHelpers.Row.of(1.0f, 1.0f));
+    TestHelpers.Row nullLat = TestHelpers.Row.of("id1", 
TestHelpers.Row.of(null, 1.0f));
+    assertComparesCorrectly(schema, latOnlyAsc, lessLat, greaterLat, 
lessLatCopy, nullLat);
+
+    SortOrder longOnlyDesc =
+        SortOrder.builderFor(schema).desc("location.long", 
NullOrder.NULLS_LAST).build();
+    TestHelpers.Row lessLong = TestHelpers.Row.of("id4", 
TestHelpers.Row.of(1.0f, 1.0f));
+    TestHelpers.Row greaterLong = TestHelpers.Row.of("id3", 
TestHelpers.Row.of(1.0f, 2.0f));
+    TestHelpers.Row lessLongCopy = TestHelpers.Row.of("id2", 
TestHelpers.Row.of(1.0f, 1.0f));
+    TestHelpers.Row nullLong = TestHelpers.Row.of("id1", 
TestHelpers.Row.of(1.0f, null));
+    assertComparesCorrectly(schema, longOnlyDesc, lessLong, greaterLong, 
lessLongCopy, nullLong);
+
+    SortOrder twoFieldsSortOrder =
+        
SortOrder.builderFor(schema).asc("location.lat").asc("location.long").build();
+    TestHelpers.Row lessTwoSortFields = TestHelpers.Row.of("id4", 
TestHelpers.Row.of(1.0f, 1.0f));
+    TestHelpers.Row greaterLatTwoSortFields =
+        TestHelpers.Row.of("id3", TestHelpers.Row.of(2.0f, 1.0f));
+    TestHelpers.Row greaterLongTwoSortFields =
+        TestHelpers.Row.of("id3", TestHelpers.Row.of(1.0f, 2.0f));
+    TestHelpers.Row greaterBothTwoSortFields =
+        TestHelpers.Row.of("id3", TestHelpers.Row.of(2.0f, 2.0f));
+    TestHelpers.Row lessTwoSortFieldsCopy =
+        TestHelpers.Row.of("id2", TestHelpers.Row.of(1.0f, 1.0f));
+    TestHelpers.Row nullLatTwoSortFields =
+        TestHelpers.Row.of("id1", TestHelpers.Row.of(null, 1.0f));
+    TestHelpers.Row nullLongTwoSortFields =
+        TestHelpers.Row.of("id1", TestHelpers.Row.of(1.0f, null));
+    TestHelpers.Row nullBothTowSortFields =
+        TestHelpers.Row.of("id1", TestHelpers.Row.of(null, null));
+    assertComparesCorrectly(
+        schema,
+        twoFieldsSortOrder,
+        lessTwoSortFields,
+        greaterLatTwoSortFields,
+        lessTwoSortFieldsCopy,
+        nullLatTwoSortFields);
+    assertComparesCorrectly(
+        schema,
+        twoFieldsSortOrder,
+        lessTwoSortFields,
+        greaterLongTwoSortFields,
+        lessTwoSortFieldsCopy,
+        nullLongTwoSortFields);
+    assertComparesCorrectly(
+        schema,
+        twoFieldsSortOrder,
+        lessTwoSortFields,
+        greaterBothTwoSortFields,
+        lessTwoSortFieldsCopy,
+        nullBothTowSortFields);
+  }
+
+  @Test
+  public void testStructTransform() {
+    Schema schema =
+        new Schema(
+            Types.NestedField.required(1, "id", Types.StringType.get()),
+            Types.NestedField.optional(
+                2,
+                "struct",
+                Types.StructType.of(
+                    Types.NestedField.required(11, "left", 
Types.BinaryType.get()),
+                    Types.NestedField.required(12, "right", 
Types.BinaryType.get()))));
+
+    Transform<ByteBuffer, ByteBuffer> bucket = Transforms.truncate(2);
+    SerializableFunction<ByteBuffer, ByteBuffer> transform = 
bucket.bind(Types.BinaryType.get());
+
+    Assertions.assertThat(transform.apply(ByteBuffer.wrap(new byte[] {2, 3, 
4})))
+        .isLessThan(transform.apply(ByteBuffer.wrap(new byte[] {9, 3, 4})));
+    Assertions.assertThat(transform.apply(ByteBuffer.wrap(new byte[] {2, 3, 
4})))
+        .isEqualTo(transform.apply(ByteBuffer.wrap(new byte[] {2, 3, 9})));
+
+    TestHelpers.Row less =
+        TestHelpers.Row.of(
+            "id4",
+            TestHelpers.Row.of(
+                ByteBuffer.wrap(new byte[] {1}), ByteBuffer.wrap(new byte[] 
{2, 3, 4})));
+    TestHelpers.Row greater =
+        TestHelpers.Row.of(
+            "id3",
+            TestHelpers.Row.of(
+                ByteBuffer.wrap(new byte[] {1}), ByteBuffer.wrap(new byte[] 
{9, 3, 4})));
+    TestHelpers.Row lessCopy =
+        TestHelpers.Row.of(
+            "id2",
+            TestHelpers.Row.of(
+                ByteBuffer.wrap(new byte[] {1}), ByteBuffer.wrap(new byte[] 
{2, 3, 9})));
+    TestHelpers.Row nullRight =
+        TestHelpers.Row.of("id1", TestHelpers.Row.of(ByteBuffer.wrap(new 
byte[] {1}), null));
+
+    SortOrder sortOrder =
+        SortOrder.builderFor(schema)
+            .sortBy(
+                Expressions.truncate("struct.left", 2), SortDirection.ASC, 
NullOrder.NULLS_FIRST)
+            .sortBy(
+                Expressions.truncate("struct.right", 2), SortDirection.ASC, 
NullOrder.NULLS_FIRST)
+            .build();
+    assertComparesCorrectly(schema, sortOrder, less, greater, lessCopy, 
nullRight);
+  }
+
+  @Test
+  public void testNestedStruct() {
+    Schema schema =
+        new Schema(
+            Types.NestedField.required(1, "id", Types.StringType.get()),
+            Types.NestedField.optional(
+                2,
+                "user",
+                Types.StructType.of(
+                    Types.NestedField.required(11, "name", 
Types.StringType.get()),
+                    Types.NestedField.optional(
+                        12,
+                        "location",
+                        Types.StructType.of(
+                            Types.NestedField.required(101, "lat", 
Types.FloatType.get()),
+                            Types.NestedField.required(102, "long", 
Types.FloatType.get()))))));
+
+    SortOrder build =
+        
SortOrder.builderFor(schema).asc("user.location.lat").asc("user.location.long").build();
+    TestHelpers.Row less =
+        TestHelpers.Row.of("id4", TestHelpers.Row.of("user4", 
TestHelpers.Row.of(1.0f, 1.0f)));
+    TestHelpers.Row greaterLat =
+        TestHelpers.Row.of("id3", TestHelpers.Row.of("user3", 
TestHelpers.Row.of(2.0f, 1.0f)));
+    TestHelpers.Row greaterLong =
+        TestHelpers.Row.of("id3", TestHelpers.Row.of("user3", 
TestHelpers.Row.of(1.0f, 2.0f)));
+    TestHelpers.Row greaterBoth =
+        TestHelpers.Row.of("id3", TestHelpers.Row.of("user3", 
TestHelpers.Row.of(2.0f, 2.0f)));
+    TestHelpers.Row lessCopy =
+        TestHelpers.Row.of("id2", TestHelpers.Row.of("user2", 
TestHelpers.Row.of(1.0f, 1.0f)));
+    TestHelpers.Row nullLat =
+        TestHelpers.Row.of("id1", TestHelpers.Row.of("user1", 
TestHelpers.Row.of(null, 1.0f)));
+    TestHelpers.Row nullLong =
+        TestHelpers.Row.of("id1", TestHelpers.Row.of("user1", 
TestHelpers.Row.of(1.0f, null)));
+    TestHelpers.Row nullBoth =
+        TestHelpers.Row.of("id1", TestHelpers.Row.of("user1", 
TestHelpers.Row.of(null, null)));
+    assertComparesCorrectly(schema, build, less, greaterLat, lessCopy, 
nullLat);
+    assertComparesCorrectly(schema, build, less, greaterLong, lessCopy, 
nullLong);
+    assertComparesCorrectly(schema, build, less, greaterBoth, lessCopy, 
nullBoth);
+  }
+}


Reply via email to