the-other-tim-brown commented on code in PR #766:
URL: https://github.com/apache/incubator-xtable/pull/766#discussion_r2638207526


##########
xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java:
##########
@@ -53,18 +54,63 @@
 @Log4j2
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public class IcebergSchemaExtractor {
-  private static final IcebergSchemaExtractor INSTANCE = new 
IcebergSchemaExtractor();
   private static final String MAP_KEY_FIELD_NAME = "key";
   private static final String MAP_VALUE_FIELD_NAME = "value";
   private static final String LIST_ELEMENT_FIELD_NAME = "element";
+  @Getter private final Map<Integer, String> idToStorageName = new HashMap<>();
 
   public static IcebergSchemaExtractor getInstance() {
-    return INSTANCE;
+    return new IcebergSchemaExtractor();
+  }
+
+  private void initializeFieldIdTracker(InternalSchema schema, AtomicInteger 
fieldIdTracker) {
+    schema.getFields().stream()
+        .forEach(
+            field -> {
+              if (field.getFieldId() != null)
+                fieldIdTracker.accumulateAndGet(field.getFieldId(), Math::max);
+              initializeFieldIdTracker(field, fieldIdTracker);
+            });
+  }
+
+  private void initializeFieldIdTracker(InternalField field, AtomicInteger 
fieldIdTracker) {
+    switch (field.getSchema().getDataType()) {
+      case RECORD:
+        initializeFieldIdTracker(field.getSchema(), fieldIdTracker);
+        return;
+      case MAP:
+        field.getSchema().getFields().stream()
+            .filter(
+                mapField ->
+                    
InternalField.Constants.MAP_KEY_FIELD_NAME.equals(mapField.getName())
+                        || 
InternalField.Constants.MAP_VALUE_FIELD_NAME.equals(mapField.getName()))
+            .forEach(
+                mapField -> {
+                  if (mapField.getFieldId() != null)

Review Comment:
   For the `if` statements in this class, let's wrap the body with `{}` so it 
is more clear what is contained in the conditional



##########
xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java:
##########
@@ -161,12 +169,42 @@ private void initializeTableIfRequired(InternalTable 
internalTable) {
     }
   }
 
+  private MappedFields updateNameMapping(MappedFields mapping, Map<Integer, 
String> updates) {
+    if (mapping == null) {
+      return null;
+    }
+    List<MappedField> fieldResults = new ArrayList<>();
+    for (MappedField field : mapping.fields()) {
+      Set<String> fieldNames = new HashSet<>(field.names());
+      if (updates.containsKey(field.id())) {
+        fieldNames.add(updates.get(field.id()));
+      }
+      MappedFields nestedMapping = updateNameMapping(field.nestedMapping(), 
updates);
+      fieldResults.add(MappedField.of(field.id(), fieldNames, nestedMapping));
+    }
+    return MappedFields.of(fieldResults);
+  }
+
   @Override
   public void syncSchema(InternalSchema schema) {
     Schema latestSchema = schemaExtractor.toIceberg(schema);
+    String mappingJson = 
transaction.table().properties().get(TableProperties.DEFAULT_NAME_MAPPING);
+    boolean hasFieldIds =
+        schema.getAllFields().stream().anyMatch(field -> field.getFieldId() != 
null);
+    // Recreate name mapping when field IDs were provided in the source schema 
to ensure every
+    // field in the mapping was assigned the same ID as what is in the source 
schema
+    NameMapping mapping =
+        mappingJson == null || hasFieldIds
+            ? MappingUtil.create(latestSchema)
+            : NameMappingParser.fromJson(mappingJson);

Review Comment:
   Does it make sense to always create the name mapping based off of the latest 
schema?
   
   If I remember correctly, this name mapping will always be useful if the 
field IDs are not persisted to the source data files so I think we'll always 
want an up to date mapping.



##########
xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java:
##########
@@ -53,18 +54,63 @@
 @Log4j2
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public class IcebergSchemaExtractor {
-  private static final IcebergSchemaExtractor INSTANCE = new 
IcebergSchemaExtractor();
   private static final String MAP_KEY_FIELD_NAME = "key";
   private static final String MAP_VALUE_FIELD_NAME = "value";
   private static final String LIST_ELEMENT_FIELD_NAME = "element";
+  @Getter private final Map<Integer, String> idToStorageName = new HashMap<>();
 
   public static IcebergSchemaExtractor getInstance() {
-    return INSTANCE;
+    return new IcebergSchemaExtractor();
+  }
+
+  private void initializeFieldIdTracker(InternalSchema schema, AtomicInteger 
fieldIdTracker) {
+    schema.getFields().stream()
+        .forEach(
+            field -> {
+              if (field.getFieldId() != null)
+                fieldIdTracker.accumulateAndGet(field.getFieldId(), Math::max);
+              initializeFieldIdTracker(field, fieldIdTracker);
+            });
+  }
+
+  private void initializeFieldIdTracker(InternalField field, AtomicInteger 
fieldIdTracker) {
+    switch (field.getSchema().getDataType()) {

Review Comment:
   Can we add a default case for this switch?



##########
xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java:
##########
@@ -161,12 +169,42 @@ private void initializeTableIfRequired(InternalTable 
internalTable) {
     }
   }
 
+  private MappedFields updateNameMapping(MappedFields mapping, Map<Integer, 
String> updates) {
+    if (mapping == null) {
+      return null;
+    }
+    List<MappedField> fieldResults = new ArrayList<>();
+    for (MappedField field : mapping.fields()) {
+      Set<String> fieldNames = new HashSet<>(field.names());
+      if (updates.containsKey(field.id())) {
+        fieldNames.add(updates.get(field.id()));
+      }
+      MappedFields nestedMapping = updateNameMapping(field.nestedMapping(), 
updates);
+      fieldResults.add(MappedField.of(field.id(), fieldNames, nestedMapping));
+    }
+    return MappedFields.of(fieldResults);
+  }
+
   @Override
   public void syncSchema(InternalSchema schema) {
     Schema latestSchema = schemaExtractor.toIceberg(schema);
+    String mappingJson = 
transaction.table().properties().get(TableProperties.DEFAULT_NAME_MAPPING);
+    boolean hasFieldIds =
+        schema.getAllFields().stream().anyMatch(field -> field.getFieldId() != 
null);
+    // Recreate name mapping when field IDs were provided in the source schema 
to ensure every
+    // field in the mapping was assigned the same ID as what is in the source 
schema
+    NameMapping mapping =
+        mappingJson == null || hasFieldIds
+            ? MappingUtil.create(latestSchema)
+            : NameMappingParser.fromJson(mappingJson);
+    mapping =
+        NameMapping.of(
+            updateNameMapping(mapping.asMappedFields(), 
schemaExtractor.getIdToStorageName()));
+    transaction
+        .updateProperties()
+        .set(TableProperties.DEFAULT_NAME_MAPPING, 
NameMappingParser.toJson(mapping))
+        .commit();
     if (!transaction.table().schema().sameSchema(latestSchema)) {

Review Comment:
   I think we should move the properties update to this block as well so we 
don't need to issue as many updates.



##########
xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java:
##########
@@ -161,12 +169,42 @@ private void initializeTableIfRequired(InternalTable 
internalTable) {
     }
   }
 
+  private MappedFields updateNameMapping(MappedFields mapping, Map<Integer, 
String> updates) {
+    if (mapping == null) {
+      return null;
+    }
+    List<MappedField> fieldResults = new ArrayList<>();
+    for (MappedField field : mapping.fields()) {
+      Set<String> fieldNames = new HashSet<>(field.names());
+      if (updates.containsKey(field.id())) {
+        fieldNames.add(updates.get(field.id()));
+      }
+      MappedFields nestedMapping = updateNameMapping(field.nestedMapping(), 
updates);
+      fieldResults.add(MappedField.of(field.id(), fieldNames, nestedMapping));
+    }
+    return MappedFields.of(fieldResults);
+  }
+
   @Override
   public void syncSchema(InternalSchema schema) {
     Schema latestSchema = schemaExtractor.toIceberg(schema);
+    String mappingJson = 
transaction.table().properties().get(TableProperties.DEFAULT_NAME_MAPPING);
+    boolean hasFieldIds =
+        schema.getAllFields().stream().anyMatch(field -> field.getFieldId() != 
null);
+    // Recreate name mapping when field IDs were provided in the source schema 
to ensure every
+    // field in the mapping was assigned the same ID as what is in the source 
schema
+    NameMapping mapping =
+        mappingJson == null || hasFieldIds
+            ? MappingUtil.create(latestSchema)
+            : NameMappingParser.fromJson(mappingJson);
+    mapping =
+        NameMapping.of(
+            updateNameMapping(mapping.asMappedFields(), 
schemaExtractor.getIdToStorageName()));

Review Comment:
   The `updateNameMapping` takes in updates but it seems like it will take in 
all the `idToStorageName` values. Is that intentional?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to