This is an automated email from the ASF dual-hosted git repository.

timbrown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git


The following commit(s) were added to refs/heads/main by this push:
     new a0d86a84 Support UUID type when iceberg to delta
a0d86a84 is described below

commit a0d86a8489eab74c3e23d08e5790cad488cd7c7a
Author: Daniel Tu <danieltu.l...@gmail.com>
AuthorDate: Sun Sep 22 22:23:34 2024 -0700

    Support UUID type when iceberg to delta
---
 .../apache/xtable/model/schema/InternalSchema.java |   2 +
 .../apache/xtable/model/schema/InternalType.java   |   1 +
 .../apache/xtable/avro/AvroSchemaConverter.java    |  14 +-
 .../apache/xtable/delta/DeltaSchemaExtractor.java  |  33 ++++-
 .../xtable/hudi/HudiPartitionValuesExtractor.java  |   1 +
 .../xtable/iceberg/IcebergSchemaExtractor.java     |   4 +-
 .../test/java/org/apache/xtable/GenericTable.java  |  16 +++
 .../org/apache/xtable/ITConversionController.java  | 148 ++++++++++++++++++++-
 .../java/org/apache/xtable/TestIcebergTable.java   |  19 ++-
 .../xtable/avro/TestAvroSchemaConverter.java       |  45 +++++++
 .../xtable/delta/TestDeltaSchemaExtractor.java     |  43 ++++++
 .../xtable/iceberg/TestIcebergDataHelper.java      |  35 ++++-
 .../xtable/iceberg/TestIcebergSchemaExtractor.java |   5 +-
 13 files changed, 342 insertions(+), 24 deletions(-)

diff --git 
a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java 
b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java
index 5e001c6c..20af37e0 100644
--- 
a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java
+++ 
b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java
@@ -78,6 +78,8 @@ public class InternalSchema {
     MILLIS
   }
 
+  public static final String XTABLE_LOGICAL_TYPE = "xtableLogicalType";
+
   /**
    * Performs a level-order traversal of the schema and returns a list of all 
fields. Use this
    * method to get a list that includes nested fields. Use {@link 
InternalSchema#getFields()} when
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalType.java 
b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalType.java
index c2f1a223..e1b1049d 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalType.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalType.java
@@ -38,6 +38,7 @@ public enum InternalType {
   LIST,
   MAP,
   UNION,
+  UUID,
   FIXED,
   STRING,
   BYTES,
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/avro/AvroSchemaConverter.java 
b/xtable-core/src/main/java/org/apache/xtable/avro/AvroSchemaConverter.java
index 346dcded..9f40d29e 100644
--- a/xtable-core/src/main/java/org/apache/xtable/avro/AvroSchemaConverter.java
+++ b/xtable-core/src/main/java/org/apache/xtable/avro/AvroSchemaConverter.java
@@ -132,8 +132,13 @@ public class AvroSchemaConverter {
           break;
         }
         if (schema.getType() == Schema.Type.FIXED) {
-          metadata.put(InternalSchema.MetadataKey.FIXED_BYTES_SIZE, 
schema.getFixedSize());
-          newDataType = InternalType.FIXED;
+          String xtableLogicalType = 
schema.getProp(InternalSchema.XTABLE_LOGICAL_TYPE);
+          if ("uuid".equals(xtableLogicalType)) {
+            newDataType = InternalType.UUID;
+          } else {
+            metadata.put(InternalSchema.MetadataKey.FIXED_BYTES_SIZE, 
schema.getFixedSize());
+            newDataType = InternalType.FIXED;
+          }
         } else {
           newDataType = InternalType.BYTES;
         }
@@ -435,6 +440,11 @@ public class AvroSchemaConverter {
             Schema.createFixed(
                 internalSchema.getName(), internalSchema.getComment(), null, 
fixedSize),
             internalSchema);
+      case UUID:
+        Schema uuidSchema =
+            Schema.createFixed(internalSchema.getName(), 
internalSchema.getComment(), null, 16);
+        uuidSchema.addProp(InternalSchema.XTABLE_LOGICAL_TYPE, "uuid");
+        return finalizeSchema(uuidSchema, internalSchema);
       default:
         throw new UnsupportedSchemaTypeException(
             "Encountered unhandled type during InternalSchema to Avro 
conversion: "
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
index fa425ef2..e312761f 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
@@ -33,6 +33,7 @@ import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.DecimalType;
 import org.apache.spark.sql.types.MapType;
 import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.MetadataBuilder;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 
@@ -73,7 +74,7 @@ public class DeltaSchemaExtractor {
                         field.getName(),
                         convertFieldType(field),
                         field.getSchema().isNullable(),
-                        Metadata.empty()))
+                        getMetaData(field.getSchema().getDataType())))
             .toArray(StructField[]::new);
     return new StructType(fields);
   }
@@ -90,6 +91,7 @@ public class DeltaSchemaExtractor {
         return DataTypes.LongType;
       case BYTES:
       case FIXED:
+      case UUID:
         return DataTypes.BinaryType;
       case BOOLEAN:
         return DataTypes.BooleanType;
@@ -142,12 +144,24 @@ public class DeltaSchemaExtractor {
     }
   }
 
+  private Metadata getMetaData(InternalType type) {
+    if (type == InternalType.UUID) {
+      return new 
MetadataBuilder().putString(InternalSchema.XTABLE_LOGICAL_TYPE, "uuid").build();
+    } else {
+      return Metadata.empty();
+    }
+  }
+
   public InternalSchema toInternalSchema(StructType structType) {
-    return toInternalSchema(structType, null, false, null);
+    return toInternalSchema(structType, null, false, null, null);
   }
 
   private InternalSchema toInternalSchema(
-      DataType dataType, String parentPath, boolean nullable, String comment) {
+      DataType dataType,
+      String parentPath,
+      boolean nullable,
+      String comment,
+      Metadata originalMetadata) {
     Map<InternalSchema.MetadataKey, Object> metadata = null;
     List<InternalField> fields = null;
     InternalType type;
@@ -172,7 +186,12 @@ public class DeltaSchemaExtractor {
         type = InternalType.DOUBLE;
         break;
       case "binary":
-        type = InternalType.BYTES;
+        if (originalMetadata.contains(InternalSchema.XTABLE_LOGICAL_TYPE)
+            && 
"uuid".equals(originalMetadata.getString(InternalSchema.XTABLE_LOGICAL_TYPE))) {
+          type = InternalType.UUID;
+        } else {
+          type = InternalType.BYTES;
+        }
         break;
       case "long":
         type = InternalType.LONG;
@@ -210,7 +229,8 @@ public class DeltaSchemaExtractor {
                               field.dataType(),
                               SchemaUtils.getFullyQualifiedPath(parentPath, 
field.name()),
                               field.nullable(),
-                              fieldComment);
+                              fieldComment,
+                              field.metadata());
                       return InternalField.builder()
                           .name(field.name())
                           .fieldId(fieldId)
@@ -238,6 +258,7 @@ public class DeltaSchemaExtractor {
                 SchemaUtils.getFullyQualifiedPath(
                     parentPath, 
InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME),
                 arrayType.containsNull(),
+                null,
                 null);
         InternalField elementField =
             InternalField.builder()
@@ -256,6 +277,7 @@ public class DeltaSchemaExtractor {
                 SchemaUtils.getFullyQualifiedPath(
                     parentPath, InternalField.Constants.MAP_VALUE_FIELD_NAME),
                 false,
+                null,
                 null);
         InternalField keyField =
             InternalField.builder()
@@ -269,6 +291,7 @@ public class DeltaSchemaExtractor {
                 SchemaUtils.getFullyQualifiedPath(
                     parentPath, InternalField.Constants.MAP_VALUE_FIELD_NAME),
                 mapType.valueContainsNull(),
+                null,
                 null);
         InternalField valueField =
             InternalField.builder()
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiPartitionValuesExtractor.java
 
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiPartitionValuesExtractor.java
index 0b95e65e..a55968d9 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiPartitionValuesExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiPartitionValuesExtractor.java
@@ -159,6 +159,7 @@ public class HudiPartitionValuesExtractor {
         break;
       case FIXED:
       case BYTES:
+      case UUID:
         parsedValue = valueAsString.getBytes(StandardCharsets.UTF_8);
         break;
       case BOOLEAN:
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java
 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java
index 3acd7856..4cf825d7 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java
@@ -199,6 +199,8 @@ public class IcebergSchemaExtractor {
         return Types.DecimalType.of(precision, scale);
       case RECORD:
         return Types.StructType.of(convertFields(field.getSchema(), 
fieldIdTracker));
+      case UUID:
+        return Types.UUIDType.get();
       case MAP:
         InternalField key =
             field.getSchema().getFields().stream()
@@ -305,7 +307,7 @@ public class IcebergSchemaExtractor {
                 InternalSchema.MetadataKey.FIXED_BYTES_SIZE, 
fixedType.length());
         break;
       case UUID:
-        type = InternalType.FIXED;
+        type = InternalType.UUID;
         metadata = 
Collections.singletonMap(InternalSchema.MetadataKey.FIXED_BYTES_SIZE, 16);
         break;
       case STRUCT:
diff --git a/xtable-core/src/test/java/org/apache/xtable/GenericTable.java 
b/xtable-core/src/test/java/org/apache/xtable/GenericTable.java
index c6b75c31..dce0f21a 100644
--- a/xtable-core/src/test/java/org/apache/xtable/GenericTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/GenericTable.java
@@ -127,6 +127,22 @@ public interface GenericTable<T, Q> extends AutoCloseable {
     }
   }
 
+  static GenericTable getInstanceWithUUIDColumns(
+      String tableName,
+      Path tempDir,
+      SparkSession sparkSession,
+      JavaSparkContext jsc,
+      String sourceFormat,
+      boolean isPartitioned) {
+    switch (sourceFormat) {
+      case ICEBERG:
+        return TestIcebergTable.forSchemaWithUUIDColumns(
+            tableName, isPartitioned ? "level" : null, tempDir, 
jsc.hadoopConfiguration());
+      default:
+        throw new IllegalArgumentException("Unsupported source format: " + 
sourceFormat);
+    }
+  }
+
   static String getTableName() {
     return "test_table_" + UUID.randomUUID().toString().replaceAll("-", "_");
   }
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java 
b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
index 58f0f982..3d539766 100644
--- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
+++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
@@ -28,6 +28,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 
 import java.net.URI;
+import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -38,12 +39,14 @@ import java.time.format.DateTimeFormatter;
 import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Base64;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.UUID;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -82,6 +85,10 @@ import org.apache.iceberg.hadoop.HadoopTables;
 
 import org.apache.spark.sql.delta.DeltaLog;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.ImmutableList;
 
 import org.apache.xtable.conversion.ConversionConfig;
@@ -100,6 +107,7 @@ public class ITConversionController {
   @TempDir public static Path tempDir;
   private static final DateTimeFormatter DATE_FORMAT =
       DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss.SSS").withZone(ZoneId.of("UTC"));
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
   private static JavaSparkContext jsc;
   private static SparkSession sparkSession;
@@ -142,6 +150,19 @@ public class ITConversionController {
     return arguments.stream();
   }
 
+  private static Stream<Arguments> generateTestParametersForUUID() {
+    List<Arguments> arguments = new ArrayList<>();
+    for (SyncMode syncMode : SyncMode.values()) {
+      for (boolean isPartitioned : new boolean[] {true, false}) {
+        // TODO: Add Hudi UUID support later 
(https://github.com/apache/incubator-xtable/issues/543)
+        // Current spark parquet reader can not handle fix-size byte array 
with UUID logic type
+        List<String> targetTableFormats = Arrays.asList(DELTA);
+        arguments.add(Arguments.of(ICEBERG, targetTableFormats, syncMode, 
isPartitioned));
+      }
+    }
+    return arguments.stream();
+  }
+
   private static Stream<Arguments> testCasesWithSyncModes() {
     return Stream.of(Arguments.of(SyncMode.INCREMENTAL), 
Arguments.of(SyncMode.FULL));
   }
@@ -261,6 +282,54 @@ public class ITConversionController {
     }
   }
 
+  // The test content is the simplified version of testVariousOperations
+  // The difference is that the data source from Iceberg contains UUID columns
+  @ParameterizedTest
+  @MethodSource("generateTestParametersForUUID")
+  public void testVariousOperationsWithUUID(
+      String sourceTableFormat,
+      List<String> targetTableFormats,
+      SyncMode syncMode,
+      boolean isPartitioned) {
+    String tableName = getTableName();
+    ConversionController conversionController = new 
ConversionController(jsc.hadoopConfiguration());
+    String partitionConfig = null;
+    if (isPartitioned) {
+      partitionConfig = "level:VALUE";
+    }
+    ConversionSourceProvider<?> conversionSourceProvider =
+        getConversionSourceProvider(sourceTableFormat);
+    List<?> insertRecords;
+    try (GenericTable table =
+        GenericTable.getInstanceWithUUIDColumns(
+            tableName, tempDir, sparkSession, jsc, sourceTableFormat, 
isPartitioned)) {
+      insertRecords = table.insertRows(100);
+
+      ConversionConfig conversionConfig =
+          getTableSyncConfig(
+              sourceTableFormat,
+              syncMode,
+              tableName,
+              table,
+              targetTableFormats,
+              partitionConfig,
+              null);
+      conversionController.sync(conversionConfig, conversionSourceProvider);
+      checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats, 
100);
+
+      // Upsert some records and sync again
+      table.upsertRows(insertRecords.subList(0, 20));
+      conversionController.sync(conversionConfig, conversionSourceProvider);
+      checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats, 
100);
+
+      table.deleteRows(insertRecords.subList(30, 50));
+      conversionController.sync(conversionConfig, conversionSourceProvider);
+      checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats, 
80);
+      checkDatasetEquivalenceWithFilter(
+          sourceTableFormat, table, targetTableFormats, 
table.getFilterQuery());
+    }
+  }
+
   @ParameterizedTest
   @MethodSource("testCasesWithPartitioningAndSyncModes")
   public void testConcurrentInsertWritesInSource(
@@ -797,13 +866,84 @@ public class ITConversionController {
             // if count is not known ahead of time, ensure datasets are 
non-empty
             assertFalse(dataset1Rows.isEmpty());
           }
+
+          if (containsUUIDFields(dataset1Rows) && 
containsUUIDFields(dataset2Rows)) {
+            compareDatasetWithUUID(dataset1Rows, dataset2Rows);
+          } else {
+            assertEquals(
+                dataset1Rows,
+                dataset2Rows,
+                String.format(
+                    "Datasets are not equivalent when reading from Spark. 
Source: %s, Target: %s",
+                    sourceFormat, format));
+          }
+        });
+  }
+
+  /**
+   * Compares two datasets where dataset1Rows is for Iceberg and dataset2Rows 
is for other formats
+   * (such as Delta or Hudi). - For the "uuid_field", if present, the UUID 
from dataset1 (Iceberg)
+   * is compared with the Base64-encoded UUID from dataset2 (other formats), 
after decoding. - For
+   * all other fields, the values are compared directly. - If neither row 
contains the "uuid_field",
+   * the rows are compared as plain JSON strings.
+   *
+   * @param dataset1Rows List of JSON rows representing the dataset in Iceberg 
format (UUID is
+   *     stored as a string).
+   * @param dataset2Rows List of JSON rows representing the dataset in other 
formats (UUID might be
+   *     Base64-encoded).
+   */
+  private void compareDatasetWithUUID(List<String> dataset1Rows, List<String> 
dataset2Rows) {
+    for (int i = 0; i < dataset1Rows.size(); i++) {
+      String row1 = dataset1Rows.get(i);
+      String row2 = dataset2Rows.get(i);
+      if (row1.contains("uuid_field") && row2.contains("uuid_field")) {
+        try {
+          JsonNode node1 = OBJECT_MAPPER.readTree(row1);
+          JsonNode node2 = OBJECT_MAPPER.readTree(row2);
+
+          // check uuid field
+          String uuidStr1 = node1.get("uuid_field").asText();
+          byte[] bytes = 
Base64.getDecoder().decode(node2.get("uuid_field").asText());
+          ByteBuffer bb = ByteBuffer.wrap(bytes);
+          UUID uuid2 = new UUID(bb.getLong(), bb.getLong());
+          String uuidStr2 = uuid2.toString();
           assertEquals(
-              dataset1Rows,
-              dataset2Rows,
+              uuidStr1,
+              uuidStr2,
               String.format(
                   "Datasets are not equivalent when reading from Spark. 
Source: %s, Target: %s",
-                  sourceFormat, format));
-        });
+                  uuidStr1, uuidStr2));
+
+          // check other fields
+          ((ObjectNode) node1).remove("uuid_field");
+          ((ObjectNode) node2).remove("uuid_field");
+          assertEquals(
+              node1.toString(),
+              node2.toString(),
+              String.format(
+                  "Datasets are not equivalent when comparing other fields. 
Source: %s, Target: %s",
+                  node1, node2));
+        } catch (JsonProcessingException e) {
+          throw new RuntimeException(e);
+        }
+      } else {
+        assertEquals(
+            row1,
+            row2,
+            String.format(
+                "Datasets are not equivalent when reading from Spark. Source: 
%s, Target: %s",
+                row1, row2));
+      }
+    }
+  }
+
+  private boolean containsUUIDFields(List<String> rows) {
+    for (String row : rows) {
+      if (row.contains("\"uuid_field\"")) {
+        return true;
+      }
+    }
+    return false;
   }
 
   private static Stream<Arguments> addBasicPartitionCases(Stream<Arguments> 
arguments) {
diff --git a/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java 
b/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java
index bb63667a..0c8336fe 100644
--- a/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java
@@ -88,7 +88,7 @@ public class TestIcebergTable implements GenericTable<Record, 
String> {
         hadoopConf,
         DEFAULT_RECORD_KEY_FIELD,
         Collections.singletonList(partitionField),
-        false);
+        TestIcebergDataHelper.SchemaType.COMMON);
   }
 
   public static TestIcebergTable forSchemaWithAdditionalColumnsAndPartitioning(
@@ -99,7 +99,18 @@ public class TestIcebergTable implements 
GenericTable<Record, String> {
         hadoopConf,
         DEFAULT_RECORD_KEY_FIELD,
         Collections.singletonList(partitionField),
-        true);
+        TestIcebergDataHelper.SchemaType.COMMON_WITH_ADDITIONAL_COLUMNS);
+  }
+
+  public static TestIcebergTable forSchemaWithUUIDColumns(
+      String tableName, String partitionField, Path tempDir, Configuration 
hadoopConf) {
+    return new TestIcebergTable(
+        tableName,
+        tempDir,
+        hadoopConf,
+        DEFAULT_RECORD_KEY_FIELD,
+        Collections.singletonList(partitionField),
+        TestIcebergDataHelper.SchemaType.COMMON_WITH_UUID_COLUMN);
   }
 
   public TestIcebergTable(
@@ -108,12 +119,12 @@ public class TestIcebergTable implements 
GenericTable<Record, String> {
       Configuration hadoopConf,
       String recordKeyField,
       List<String> partitionFields,
-      boolean includeAdditionalColumns) {
+      TestIcebergDataHelper.SchemaType schemaType) {
     this.tableName = tableName;
     this.basePath = tempDir.toUri().toString();
     this.icebergDataHelper =
         TestIcebergDataHelper.createIcebergDataHelper(
-            recordKeyField, filterNullFields(partitionFields), 
includeAdditionalColumns);
+            recordKeyField, filterNullFields(partitionFields), schemaType);
     this.schema = icebergDataHelper.getTableSchema();
 
     PartitionSpec partitionSpec = icebergDataHelper.getPartitionSpec();
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/avro/TestAvroSchemaConverter.java 
b/xtable-core/src/test/java/org/apache/xtable/avro/TestAvroSchemaConverter.java
index 20d41139..0b6823a1 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/avro/TestAvroSchemaConverter.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/avro/TestAvroSchemaConverter.java
@@ -844,4 +844,49 @@ public class TestAvroSchemaConverter {
             .build();
     assertEquals(internalSchema, 
AvroSchemaConverter.getInstance().toInternalSchema(schemaWithIds));
   }
+
+  @Test
+  public void testIcebergToAvroUUIDSupport() {
+    String schemaName = "testRecord";
+    String doc = "What's up doc";
+    Schema avroRepresentation =
+        new Schema.Parser()
+            .parse(
+                "{\"type\":\"record\",\"name\":\"testRecord\",\"doc\":\"What's 
up doc\",\"fields\":["
+                    + 
"{\"name\":\"requiredUUID\",\"type\":{\"type\":\"fixed\",\"name\":\"required_uuid\",\"size\":16,\"xtableLogicalType\":\"uuid\"}},"
+                    + 
"{\"name\":\"optionalUUID\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"optional_uuid\",\"size\":16,\"xtableLogicalType\":\"uuid\"}],\"default\":null}"
+                    + "]}");
+    InternalSchema internalSchema =
+        InternalSchema.builder()
+            .name(schemaName)
+            .comment(doc)
+            .dataType(InternalType.RECORD)
+            .isNullable(false)
+            .fields(
+                Arrays.asList(
+                    InternalField.builder()
+                        .name("requiredUUID")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("required_uuid")
+                                .dataType(InternalType.UUID)
+                                .isNullable(false)
+                                .build())
+                        .build(),
+                    InternalField.builder()
+                        .name("optionalUUID")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("optional_uuid")
+                                .dataType(InternalType.UUID)
+                                .isNullable(true)
+                                .build())
+                        
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+                        .build()))
+            .build();
+    assertEquals(
+        avroRepresentation, 
AvroSchemaConverter.getInstance().fromInternalSchema(internalSchema));
+    assertEquals(
+        internalSchema, 
AvroSchemaConverter.getInstance().toInternalSchema(avroRepresentation));
+  }
 }
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
 
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
index 45c90660..4b0eacd0 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
@@ -25,6 +25,7 @@ import java.util.Map;
 
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.MetadataBuilder;
 import org.apache.spark.sql.types.StructType;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -892,4 +893,46 @@ public class TestDeltaSchemaExtractor {
     Assertions.assertEquals(
         internalSchema, 
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
   }
+
+  @Test
+  public void testIcebergToDeltaUUIDSupport() {
+    Metadata metadata =
+        new MetadataBuilder().putString(InternalSchema.XTABLE_LOGICAL_TYPE, 
"uuid").build();
+    StructType structRepresentation =
+        new StructType()
+            .add("requiredUUID", DataTypes.BinaryType, false, metadata)
+            .add("optionalUUID", DataTypes.BinaryType, true, metadata);
+    InternalSchema internalSchema =
+        InternalSchema.builder()
+            .name("struct")
+            .dataType(InternalType.RECORD)
+            .isNullable(false)
+            .fields(
+                Arrays.asList(
+                    InternalField.builder()
+                        .name("requiredUUID")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("binary")
+                                .dataType(InternalType.UUID)
+                                .isNullable(false)
+                                .build())
+                        .build(),
+                    InternalField.builder()
+                        .name("optionalUUID")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("binary")
+                                .dataType(InternalType.UUID)
+                                .isNullable(true)
+                                .build())
+                        
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+                        .build()))
+            .build();
+    Assertions.assertEquals(
+        structRepresentation,
+        DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchema));
+    Assertions.assertEquals(
+        internalSchema, 
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
+  }
 }
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java
 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java
index cf806fd6..1d10fe7a 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java
@@ -104,12 +104,16 @@ public class TestIcebergDataHelper {
       Arrays.asList(
           NestedField.optional(31, "additional_column1", 
Types.StringType.get()),
           NestedField.optional(32, "additional_column2", 
Types.LongType.get()));
+  private static final List<Types.NestedField> UUID_FIELDS =
+      Arrays.asList(NestedField.optional(33, "uuid_field", 
Types.UUIDType.get()));
   private static final Schema BASE_SCHEMA = new Schema(COMMON_FIELDS);
   private static final Schema SCHEMA_WITH_ADDITIONAL_COLUMNS =
       new Schema(
           Stream.concat(COMMON_FIELDS.stream(), ADDITIONAL_FIELDS.stream())
               .collect(Collectors.toList()));
-
+  private static final Schema SCHEMA_WITH_UUID_COLUMN =
+      new Schema(
+          Stream.concat(COMMON_FIELDS.stream(), 
UUID_FIELDS.stream()).collect(Collectors.toList()));
   private static final OffsetDateTime EPOCH = 
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
   private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
 
@@ -117,9 +121,15 @@ public class TestIcebergDataHelper {
   String recordKeyField;
   List<String> partitionFieldNames;
 
+  public static enum SchemaType {
+    COMMON,
+    COMMON_WITH_ADDITIONAL_COLUMNS,
+    COMMON_WITH_UUID_COLUMN,
+  }
+
   public static TestIcebergDataHelper createIcebergDataHelper(
-      String recordKeyField, List<String> partitionFields, boolean 
includeAdditionalColumns) {
-    Schema tableSchema = getSchema(includeAdditionalColumns);
+      String recordKeyField, List<String> partitionFields, SchemaType 
schemaType) {
+    Schema tableSchema = getSchema(schemaType);
     return TestIcebergDataHelper.builder()
         .tableSchema(tableSchema)
         .recordKeyField(recordKeyField)
@@ -127,8 +137,17 @@ public class TestIcebergDataHelper {
         .build();
   }
 
-  private static Schema getSchema(boolean includeAdditionalColumns) {
-    return includeAdditionalColumns ? SCHEMA_WITH_ADDITIONAL_COLUMNS : 
BASE_SCHEMA;
+  private static Schema getSchema(SchemaType schemaType) {
+    switch (schemaType) {
+      case COMMON:
+        return BASE_SCHEMA;
+      case COMMON_WITH_ADDITIONAL_COLUMNS:
+        return SCHEMA_WITH_ADDITIONAL_COLUMNS;
+      case COMMON_WITH_UUID_COLUMN:
+        return SCHEMA_WITH_UUID_COLUMN;
+      default:
+        throw new IllegalArgumentException("Unknown schema type: " + 
schemaType);
+    }
   }
 
   public List<Record> generateInsertRecords(int numRecords) {
@@ -299,7 +318,11 @@ public class TestIcebergDataHelper {
       case STRUCT:
         return generateInsertRecord(timeLowerBound, timeUpperBound, 
fieldType.asStructType());
       case UUID:
-        return UUID.randomUUID().toString();
+        UUID uuid = UUID.randomUUID();
+        ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
+        byteBuffer.putLong(uuid.getMostSignificantBits());
+        byteBuffer.putLong(uuid.getLeastSignificantBits());
+        return byteBuffer.array();
       case LIST:
         Types.ListType listType = (Types.ListType) fieldType;
         int listSize = RANDOM.nextInt(5) + 1;
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java
 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java
index 7559a5e8..28776541 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java
@@ -372,7 +372,7 @@ public class TestIcebergSchemaExtractor {
                         .schema(
                             InternalSchema.builder()
                                 .name("uuid")
-                                .dataType(InternalType.FIXED)
+                                .dataType(InternalType.UUID)
                                 .isNullable(false)
                                 .metadata(fixedMetadata)
                                 .build())
@@ -383,7 +383,7 @@ public class TestIcebergSchemaExtractor {
                         .schema(
                             InternalSchema.builder()
                                 .name("uuid")
-                                .dataType(InternalType.FIXED)
+                                .dataType(InternalType.UUID)
                                 .isNullable(true)
                                 .metadata(fixedMetadata)
                                 .build())
@@ -391,6 +391,7 @@ public class TestIcebergSchemaExtractor {
                         .build()))
             .build();
     assertEquals(expectedSchema, (SCHEMA_EXTRACTOR.fromIceberg(inputSchema)));
+    
assertTrue(inputSchema.sameSchema(SCHEMA_EXTRACTOR.toIceberg(expectedSchema)));
   }
 
   @Test

Reply via email to