This is an automated email from the ASF dual-hosted git repository. timbrown pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push: new a0d86a84 Support UUID type when iceberg to delta a0d86a84 is described below commit a0d86a8489eab74c3e23d08e5790cad488cd7c7a Author: Daniel Tu <danieltu.l...@gmail.com> AuthorDate: Sun Sep 22 22:23:34 2024 -0700 Support UUID type when iceberg to delta --- .../apache/xtable/model/schema/InternalSchema.java | 2 + .../apache/xtable/model/schema/InternalType.java | 1 + .../apache/xtable/avro/AvroSchemaConverter.java | 14 +- .../apache/xtable/delta/DeltaSchemaExtractor.java | 33 ++++- .../xtable/hudi/HudiPartitionValuesExtractor.java | 1 + .../xtable/iceberg/IcebergSchemaExtractor.java | 4 +- .../test/java/org/apache/xtable/GenericTable.java | 16 +++ .../org/apache/xtable/ITConversionController.java | 148 ++++++++++++++++++++- .../java/org/apache/xtable/TestIcebergTable.java | 19 ++- .../xtable/avro/TestAvroSchemaConverter.java | 45 +++++++ .../xtable/delta/TestDeltaSchemaExtractor.java | 43 ++++++ .../xtable/iceberg/TestIcebergDataHelper.java | 35 ++++- .../xtable/iceberg/TestIcebergSchemaExtractor.java | 5 +- 13 files changed, 342 insertions(+), 24 deletions(-) diff --git a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java index 5e001c6c..20af37e0 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java @@ -78,6 +78,8 @@ public class InternalSchema { MILLIS } + public static final String XTABLE_LOGICAL_TYPE = "xtableLogicalType"; + /** * Performs a level-order traversal of the schema and returns a list of all fields. Use this * method to get a list that includes nested fields. Use {@link InternalSchema#getFields()} when diff --git a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalType.java b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalType.java index c2f1a223..e1b1049d 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalType.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalType.java @@ -38,6 +38,7 @@ public enum InternalType { LIST, MAP, UNION, + UUID, FIXED, STRING, BYTES, diff --git a/xtable-core/src/main/java/org/apache/xtable/avro/AvroSchemaConverter.java b/xtable-core/src/main/java/org/apache/xtable/avro/AvroSchemaConverter.java index 346dcded..9f40d29e 100644 --- a/xtable-core/src/main/java/org/apache/xtable/avro/AvroSchemaConverter.java +++ b/xtable-core/src/main/java/org/apache/xtable/avro/AvroSchemaConverter.java @@ -132,8 +132,13 @@ public class AvroSchemaConverter { break; } if (schema.getType() == Schema.Type.FIXED) { - metadata.put(InternalSchema.MetadataKey.FIXED_BYTES_SIZE, schema.getFixedSize()); - newDataType = InternalType.FIXED; + String xtableLogicalType = schema.getProp(InternalSchema.XTABLE_LOGICAL_TYPE); + if ("uuid".equals(xtableLogicalType)) { + newDataType = InternalType.UUID; + } else { + metadata.put(InternalSchema.MetadataKey.FIXED_BYTES_SIZE, schema.getFixedSize()); + newDataType = InternalType.FIXED; + } } else { newDataType = InternalType.BYTES; } @@ -435,6 +440,11 @@ public class AvroSchemaConverter { Schema.createFixed( internalSchema.getName(), internalSchema.getComment(), null, fixedSize), internalSchema); + case UUID: + Schema uuidSchema = + Schema.createFixed(internalSchema.getName(), internalSchema.getComment(), null, 16); + uuidSchema.addProp(InternalSchema.XTABLE_LOGICAL_TYPE, "uuid"); + return finalizeSchema(uuidSchema, internalSchema); default: throw new UnsupportedSchemaTypeException( "Encountered unhandled type during InternalSchema to Avro conversion: " diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java index fa425ef2..e312761f 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java @@ -33,6 +33,7 @@ import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.DecimalType; import org.apache.spark.sql.types.MapType; import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.MetadataBuilder; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; @@ -73,7 +74,7 @@ public class DeltaSchemaExtractor { field.getName(), convertFieldType(field), field.getSchema().isNullable(), - Metadata.empty())) + getMetaData(field.getSchema().getDataType()))) .toArray(StructField[]::new); return new StructType(fields); } @@ -90,6 +91,7 @@ public class DeltaSchemaExtractor { return DataTypes.LongType; case BYTES: case FIXED: + case UUID: return DataTypes.BinaryType; case BOOLEAN: return DataTypes.BooleanType; @@ -142,12 +144,24 @@ public class DeltaSchemaExtractor { } } + private Metadata getMetaData(InternalType type) { + if (type == InternalType.UUID) { + return new MetadataBuilder().putString(InternalSchema.XTABLE_LOGICAL_TYPE, "uuid").build(); + } else { + return Metadata.empty(); + } + } + public InternalSchema toInternalSchema(StructType structType) { - return toInternalSchema(structType, null, false, null); + return toInternalSchema(structType, null, false, null, null); } private InternalSchema toInternalSchema( - DataType dataType, String parentPath, boolean nullable, String comment) { + DataType dataType, + String parentPath, + boolean nullable, + String comment, + Metadata originalMetadata) { Map<InternalSchema.MetadataKey, Object> metadata = null; List<InternalField> fields = null; InternalType type; @@ -172,7 +186,12 @@ public class DeltaSchemaExtractor { type = InternalType.DOUBLE; break; case "binary": - type = InternalType.BYTES; + if (originalMetadata.contains(InternalSchema.XTABLE_LOGICAL_TYPE) + && "uuid".equals(originalMetadata.getString(InternalSchema.XTABLE_LOGICAL_TYPE))) { + type = InternalType.UUID; + } else { + type = InternalType.BYTES; + } break; case "long": type = InternalType.LONG; @@ -210,7 +229,8 @@ public class DeltaSchemaExtractor { field.dataType(), SchemaUtils.getFullyQualifiedPath(parentPath, field.name()), field.nullable(), - fieldComment); + fieldComment, + field.metadata()); return InternalField.builder() .name(field.name()) .fieldId(fieldId) @@ -238,6 +258,7 @@ public class DeltaSchemaExtractor { SchemaUtils.getFullyQualifiedPath( parentPath, InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME), arrayType.containsNull(), + null, null); InternalField elementField = InternalField.builder() @@ -256,6 +277,7 @@ public class DeltaSchemaExtractor { SchemaUtils.getFullyQualifiedPath( parentPath, InternalField.Constants.MAP_VALUE_FIELD_NAME), false, + null, null); InternalField keyField = InternalField.builder() @@ -269,6 +291,7 @@ public class DeltaSchemaExtractor { SchemaUtils.getFullyQualifiedPath( parentPath, InternalField.Constants.MAP_VALUE_FIELD_NAME), mapType.valueContainsNull(), + null, null); InternalField valueField = InternalField.builder() diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiPartitionValuesExtractor.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiPartitionValuesExtractor.java index 0b95e65e..a55968d9 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiPartitionValuesExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiPartitionValuesExtractor.java @@ -159,6 +159,7 @@ public class HudiPartitionValuesExtractor { break; case FIXED: case BYTES: + case UUID: parsedValue = valueAsString.getBytes(StandardCharsets.UTF_8); break; case BOOLEAN: diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java index 3acd7856..4cf825d7 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java @@ -199,6 +199,8 @@ public class IcebergSchemaExtractor { return Types.DecimalType.of(precision, scale); case RECORD: return Types.StructType.of(convertFields(field.getSchema(), fieldIdTracker)); + case UUID: + return Types.UUIDType.get(); case MAP: InternalField key = field.getSchema().getFields().stream() @@ -305,7 +307,7 @@ public class IcebergSchemaExtractor { InternalSchema.MetadataKey.FIXED_BYTES_SIZE, fixedType.length()); break; case UUID: - type = InternalType.FIXED; + type = InternalType.UUID; metadata = Collections.singletonMap(InternalSchema.MetadataKey.FIXED_BYTES_SIZE, 16); break; case STRUCT: diff --git a/xtable-core/src/test/java/org/apache/xtable/GenericTable.java b/xtable-core/src/test/java/org/apache/xtable/GenericTable.java index c6b75c31..dce0f21a 100644 --- a/xtable-core/src/test/java/org/apache/xtable/GenericTable.java +++ b/xtable-core/src/test/java/org/apache/xtable/GenericTable.java @@ -127,6 +127,22 @@ public interface GenericTable<T, Q> extends AutoCloseable { } } + static GenericTable getInstanceWithUUIDColumns( + String tableName, + Path tempDir, + SparkSession sparkSession, + JavaSparkContext jsc, + String sourceFormat, + boolean isPartitioned) { + switch (sourceFormat) { + case ICEBERG: + return TestIcebergTable.forSchemaWithUUIDColumns( + tableName, isPartitioned ? "level" : null, tempDir, jsc.hadoopConfiguration()); + default: + throw new IllegalArgumentException("Unsupported source format: " + sourceFormat); + } + } + static String getTableName() { return "test_table_" + UUID.randomUUID().toString().replaceAll("-", "_"); } diff --git a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java index 58f0f982..3d539766 100644 --- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java @@ -28,6 +28,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import java.net.URI; +import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -38,12 +39,14 @@ import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; +import java.util.Base64; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.UUID; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -82,6 +85,10 @@ import org.apache.iceberg.hadoop.HadoopTables; import org.apache.spark.sql.delta.DeltaLog; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableList; import org.apache.xtable.conversion.ConversionConfig; @@ -100,6 +107,7 @@ public class ITConversionController { @TempDir public static Path tempDir; private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.of("UTC")); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static JavaSparkContext jsc; private static SparkSession sparkSession; @@ -142,6 +150,19 @@ public class ITConversionController { return arguments.stream(); } + private static Stream<Arguments> generateTestParametersForUUID() { + List<Arguments> arguments = new ArrayList<>(); + for (SyncMode syncMode : SyncMode.values()) { + for (boolean isPartitioned : new boolean[] {true, false}) { + // TODO: Add Hudi UUID support later (https://github.com/apache/incubator-xtable/issues/543) + // Current spark parquet reader can not handle fix-size byte array with UUID logic type + List<String> targetTableFormats = Arrays.asList(DELTA); + arguments.add(Arguments.of(ICEBERG, targetTableFormats, syncMode, isPartitioned)); + } + } + return arguments.stream(); + } + private static Stream<Arguments> testCasesWithSyncModes() { return Stream.of(Arguments.of(SyncMode.INCREMENTAL), Arguments.of(SyncMode.FULL)); } @@ -261,6 +282,54 @@ public class ITConversionController { } } + // The test content is the simplified version of testVariousOperations + // The difference is that the data source from Iceberg contains UUID columns + @ParameterizedTest + @MethodSource("generateTestParametersForUUID") + public void testVariousOperationsWithUUID( + String sourceTableFormat, + List<String> targetTableFormats, + SyncMode syncMode, + boolean isPartitioned) { + String tableName = getTableName(); + ConversionController conversionController = new ConversionController(jsc.hadoopConfiguration()); + String partitionConfig = null; + if (isPartitioned) { + partitionConfig = "level:VALUE"; + } + ConversionSourceProvider<?> conversionSourceProvider = + getConversionSourceProvider(sourceTableFormat); + List<?> insertRecords; + try (GenericTable table = + GenericTable.getInstanceWithUUIDColumns( + tableName, tempDir, sparkSession, jsc, sourceTableFormat, isPartitioned)) { + insertRecords = table.insertRows(100); + + ConversionConfig conversionConfig = + getTableSyncConfig( + sourceTableFormat, + syncMode, + tableName, + table, + targetTableFormats, + partitionConfig, + null); + conversionController.sync(conversionConfig, conversionSourceProvider); + checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats, 100); + + // Upsert some records and sync again + table.upsertRows(insertRecords.subList(0, 20)); + conversionController.sync(conversionConfig, conversionSourceProvider); + checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats, 100); + + table.deleteRows(insertRecords.subList(30, 50)); + conversionController.sync(conversionConfig, conversionSourceProvider); + checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats, 80); + checkDatasetEquivalenceWithFilter( + sourceTableFormat, table, targetTableFormats, table.getFilterQuery()); + } + } + @ParameterizedTest @MethodSource("testCasesWithPartitioningAndSyncModes") public void testConcurrentInsertWritesInSource( @@ -797,13 +866,84 @@ public class ITConversionController { // if count is not known ahead of time, ensure datasets are non-empty assertFalse(dataset1Rows.isEmpty()); } + + if (containsUUIDFields(dataset1Rows) && containsUUIDFields(dataset2Rows)) { + compareDatasetWithUUID(dataset1Rows, dataset2Rows); + } else { + assertEquals( + dataset1Rows, + dataset2Rows, + String.format( + "Datasets are not equivalent when reading from Spark. Source: %s, Target: %s", + sourceFormat, format)); + } + }); + } + + /** + * Compares two datasets where dataset1Rows is for Iceberg and dataset2Rows is for other formats + * (such as Delta or Hudi). - For the "uuid_field", if present, the UUID from dataset1 (Iceberg) + * is compared with the Base64-encoded UUID from dataset2 (other formats), after decoding. - For + * all other fields, the values are compared directly. - If neither row contains the "uuid_field", + * the rows are compared as plain JSON strings. + * + * @param dataset1Rows List of JSON rows representing the dataset in Iceberg format (UUID is + * stored as a string). + * @param dataset2Rows List of JSON rows representing the dataset in other formats (UUID might be + * Base64-encoded). + */ + private void compareDatasetWithUUID(List<String> dataset1Rows, List<String> dataset2Rows) { + for (int i = 0; i < dataset1Rows.size(); i++) { + String row1 = dataset1Rows.get(i); + String row2 = dataset2Rows.get(i); + if (row1.contains("uuid_field") && row2.contains("uuid_field")) { + try { + JsonNode node1 = OBJECT_MAPPER.readTree(row1); + JsonNode node2 = OBJECT_MAPPER.readTree(row2); + + // check uuid field + String uuidStr1 = node1.get("uuid_field").asText(); + byte[] bytes = Base64.getDecoder().decode(node2.get("uuid_field").asText()); + ByteBuffer bb = ByteBuffer.wrap(bytes); + UUID uuid2 = new UUID(bb.getLong(), bb.getLong()); + String uuidStr2 = uuid2.toString(); assertEquals( - dataset1Rows, - dataset2Rows, + uuidStr1, + uuidStr2, String.format( "Datasets are not equivalent when reading from Spark. Source: %s, Target: %s", - sourceFormat, format)); - }); + uuidStr1, uuidStr2)); + + // check other fields + ((ObjectNode) node1).remove("uuid_field"); + ((ObjectNode) node2).remove("uuid_field"); + assertEquals( + node1.toString(), + node2.toString(), + String.format( + "Datasets are not equivalent when comparing other fields. Source: %s, Target: %s", + node1, node2)); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } else { + assertEquals( + row1, + row2, + String.format( + "Datasets are not equivalent when reading from Spark. Source: %s, Target: %s", + row1, row2)); + } + } + } + + private boolean containsUUIDFields(List<String> rows) { + for (String row : rows) { + if (row.contains("\"uuid_field\"")) { + return true; + } + } + return false; } private static Stream<Arguments> addBasicPartitionCases(Stream<Arguments> arguments) { diff --git a/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java b/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java index bb63667a..0c8336fe 100644 --- a/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java +++ b/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java @@ -88,7 +88,7 @@ public class TestIcebergTable implements GenericTable<Record, String> { hadoopConf, DEFAULT_RECORD_KEY_FIELD, Collections.singletonList(partitionField), - false); + TestIcebergDataHelper.SchemaType.COMMON); } public static TestIcebergTable forSchemaWithAdditionalColumnsAndPartitioning( @@ -99,7 +99,18 @@ public class TestIcebergTable implements GenericTable<Record, String> { hadoopConf, DEFAULT_RECORD_KEY_FIELD, Collections.singletonList(partitionField), - true); + TestIcebergDataHelper.SchemaType.COMMON_WITH_ADDITIONAL_COLUMNS); + } + + public static TestIcebergTable forSchemaWithUUIDColumns( + String tableName, String partitionField, Path tempDir, Configuration hadoopConf) { + return new TestIcebergTable( + tableName, + tempDir, + hadoopConf, + DEFAULT_RECORD_KEY_FIELD, + Collections.singletonList(partitionField), + TestIcebergDataHelper.SchemaType.COMMON_WITH_UUID_COLUMN); } public TestIcebergTable( @@ -108,12 +119,12 @@ public class TestIcebergTable implements GenericTable<Record, String> { Configuration hadoopConf, String recordKeyField, List<String> partitionFields, - boolean includeAdditionalColumns) { + TestIcebergDataHelper.SchemaType schemaType) { this.tableName = tableName; this.basePath = tempDir.toUri().toString(); this.icebergDataHelper = TestIcebergDataHelper.createIcebergDataHelper( - recordKeyField, filterNullFields(partitionFields), includeAdditionalColumns); + recordKeyField, filterNullFields(partitionFields), schemaType); this.schema = icebergDataHelper.getTableSchema(); PartitionSpec partitionSpec = icebergDataHelper.getPartitionSpec(); diff --git a/xtable-core/src/test/java/org/apache/xtable/avro/TestAvroSchemaConverter.java b/xtable-core/src/test/java/org/apache/xtable/avro/TestAvroSchemaConverter.java index 20d41139..0b6823a1 100644 --- a/xtable-core/src/test/java/org/apache/xtable/avro/TestAvroSchemaConverter.java +++ b/xtable-core/src/test/java/org/apache/xtable/avro/TestAvroSchemaConverter.java @@ -844,4 +844,49 @@ public class TestAvroSchemaConverter { .build(); assertEquals(internalSchema, AvroSchemaConverter.getInstance().toInternalSchema(schemaWithIds)); } + + @Test + public void testIcebergToAvroUUIDSupport() { + String schemaName = "testRecord"; + String doc = "What's up doc"; + Schema avroRepresentation = + new Schema.Parser() + .parse( + "{\"type\":\"record\",\"name\":\"testRecord\",\"doc\":\"What's up doc\",\"fields\":[" + + "{\"name\":\"requiredUUID\",\"type\":{\"type\":\"fixed\",\"name\":\"required_uuid\",\"size\":16,\"xtableLogicalType\":\"uuid\"}}," + + "{\"name\":\"optionalUUID\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"optional_uuid\",\"size\":16,\"xtableLogicalType\":\"uuid\"}],\"default\":null}" + + "]}"); + InternalSchema internalSchema = + InternalSchema.builder() + .name(schemaName) + .comment(doc) + .dataType(InternalType.RECORD) + .isNullable(false) + .fields( + Arrays.asList( + InternalField.builder() + .name("requiredUUID") + .schema( + InternalSchema.builder() + .name("required_uuid") + .dataType(InternalType.UUID) + .isNullable(false) + .build()) + .build(), + InternalField.builder() + .name("optionalUUID") + .schema( + InternalSchema.builder() + .name("optional_uuid") + .dataType(InternalType.UUID) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build())) + .build(); + assertEquals( + avroRepresentation, AvroSchemaConverter.getInstance().fromInternalSchema(internalSchema)); + assertEquals( + internalSchema, AvroSchemaConverter.getInstance().toInternalSchema(avroRepresentation)); + } } diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java index 45c90660..4b0eacd0 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java @@ -25,6 +25,7 @@ import java.util.Map; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.MetadataBuilder; import org.apache.spark.sql.types.StructType; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -892,4 +893,46 @@ public class TestDeltaSchemaExtractor { Assertions.assertEquals( internalSchema, DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation)); } + + @Test + public void testIcebergToDeltaUUIDSupport() { + Metadata metadata = + new MetadataBuilder().putString(InternalSchema.XTABLE_LOGICAL_TYPE, "uuid").build(); + StructType structRepresentation = + new StructType() + .add("requiredUUID", DataTypes.BinaryType, false, metadata) + .add("optionalUUID", DataTypes.BinaryType, true, metadata); + InternalSchema internalSchema = + InternalSchema.builder() + .name("struct") + .dataType(InternalType.RECORD) + .isNullable(false) + .fields( + Arrays.asList( + InternalField.builder() + .name("requiredUUID") + .schema( + InternalSchema.builder() + .name("binary") + .dataType(InternalType.UUID) + .isNullable(false) + .build()) + .build(), + InternalField.builder() + .name("optionalUUID") + .schema( + InternalSchema.builder() + .name("binary") + .dataType(InternalType.UUID) + .isNullable(true) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build())) + .build(); + Assertions.assertEquals( + structRepresentation, + DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchema)); + Assertions.assertEquals( + internalSchema, DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation)); + } } diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java index cf806fd6..1d10fe7a 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java @@ -104,12 +104,16 @@ public class TestIcebergDataHelper { Arrays.asList( NestedField.optional(31, "additional_column1", Types.StringType.get()), NestedField.optional(32, "additional_column2", Types.LongType.get())); + private static final List<Types.NestedField> UUID_FIELDS = + Arrays.asList(NestedField.optional(33, "uuid_field", Types.UUIDType.get())); private static final Schema BASE_SCHEMA = new Schema(COMMON_FIELDS); private static final Schema SCHEMA_WITH_ADDITIONAL_COLUMNS = new Schema( Stream.concat(COMMON_FIELDS.stream(), ADDITIONAL_FIELDS.stream()) .collect(Collectors.toList())); - + private static final Schema SCHEMA_WITH_UUID_COLUMN = + new Schema( + Stream.concat(COMMON_FIELDS.stream(), UUID_FIELDS.stream()).collect(Collectors.toList())); private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); @@ -117,9 +121,15 @@ public class TestIcebergDataHelper { String recordKeyField; List<String> partitionFieldNames; + public static enum SchemaType { + COMMON, + COMMON_WITH_ADDITIONAL_COLUMNS, + COMMON_WITH_UUID_COLUMN, + } + public static TestIcebergDataHelper createIcebergDataHelper( - String recordKeyField, List<String> partitionFields, boolean includeAdditionalColumns) { - Schema tableSchema = getSchema(includeAdditionalColumns); + String recordKeyField, List<String> partitionFields, SchemaType schemaType) { + Schema tableSchema = getSchema(schemaType); return TestIcebergDataHelper.builder() .tableSchema(tableSchema) .recordKeyField(recordKeyField) @@ -127,8 +137,17 @@ public class TestIcebergDataHelper { .build(); } - private static Schema getSchema(boolean includeAdditionalColumns) { - return includeAdditionalColumns ? SCHEMA_WITH_ADDITIONAL_COLUMNS : BASE_SCHEMA; + private static Schema getSchema(SchemaType schemaType) { + switch (schemaType) { + case COMMON: + return BASE_SCHEMA; + case COMMON_WITH_ADDITIONAL_COLUMNS: + return SCHEMA_WITH_ADDITIONAL_COLUMNS; + case COMMON_WITH_UUID_COLUMN: + return SCHEMA_WITH_UUID_COLUMN; + default: + throw new IllegalArgumentException("Unknown schema type: " + schemaType); + } } public List<Record> generateInsertRecords(int numRecords) { @@ -299,7 +318,11 @@ public class TestIcebergDataHelper { case STRUCT: return generateInsertRecord(timeLowerBound, timeUpperBound, fieldType.asStructType()); case UUID: - return UUID.randomUUID().toString(); + UUID uuid = UUID.randomUUID(); + ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]); + byteBuffer.putLong(uuid.getMostSignificantBits()); + byteBuffer.putLong(uuid.getLeastSignificantBits()); + return byteBuffer.array(); case LIST: Types.ListType listType = (Types.ListType) fieldType; int listSize = RANDOM.nextInt(5) + 1; diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java index 7559a5e8..28776541 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java @@ -372,7 +372,7 @@ public class TestIcebergSchemaExtractor { .schema( InternalSchema.builder() .name("uuid") - .dataType(InternalType.FIXED) + .dataType(InternalType.UUID) .isNullable(false) .metadata(fixedMetadata) .build()) @@ -383,7 +383,7 @@ public class TestIcebergSchemaExtractor { .schema( InternalSchema.builder() .name("uuid") - .dataType(InternalType.FIXED) + .dataType(InternalType.UUID) .isNullable(true) .metadata(fixedMetadata) .build()) @@ -391,6 +391,7 @@ public class TestIcebergSchemaExtractor { .build())) .build(); assertEquals(expectedSchema, (SCHEMA_EXTRACTOR.fromIceberg(inputSchema))); + assertTrue(inputSchema.sameSchema(SCHEMA_EXTRACTOR.toIceberg(expectedSchema))); } @Test