This is an automated email from the ASF dual-hosted git repository.

isapego pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 2f4dabcd96c IGNITE-26520 Migration Tools: Refactor SqlDdlGenerator 
(#6656)
2f4dabcd96c is described below

commit 2f4dabcd96c77a0941e80a1f1cf3a5bcb18acd81
Author: Tiago Marques Godinho <[email protected]>
AuthorDate: Tue Sep 30 11:28:27 2025 +0100

    IGNITE-26520 Migration Tools: Refactor SqlDdlGenerator (#6656)
---
 .../migration-tools-commons-tests/build.gradle     |   6 +-
 .../tests/models/InterceptingFieldsModel.java      | 112 ++++++
 .../sql/FieldNameConflictException.java            |   3 +-
 .../ignite/migrationtools/sql/SqlDdlGenerator.java | 403 ++++++++++-----------
 .../PersistentTableTypeRegistryImpl.java           |  67 +++-
 .../RegisterOnlyTableTypeRegistry.java             |   9 +-
 .../tablemanagement/TableTypeDescriptor.java       |  78 ++++
 .../tablemanagement/TableTypeRegistry.java         |  11 +-
 .../tablemanagement/TableTypeRegistryMapImpl.java  |  19 +-
 .../tablemanagement/TableTypeRegistryUtils.java    |  48 +++
 .../migrationtools/types/InspectedField.java       | 159 ++++++++
 .../InspectedFieldType.java}                       |  29 +-
 .../ignite/migrationtools/types/TypeInspector.java | 138 +++++++
 .../sql/sql/SqlDdlGeneratorTest.java               | 225 ++++++++++--
 .../migrationtools/types/TypeInspectorTest.java    | 114 ++++++
 .../persistence/Ignite2PersistentCacheTools.java   |   2 +-
 16 files changed, 1118 insertions(+), 305 deletions(-)

diff --git a/migration-tools/modules/migration-tools-commons-tests/build.gradle 
b/migration-tools/modules/migration-tools-commons-tests/build.gradle
index 184836bf170..214e1f25cc7 100644
--- a/migration-tools/modules/migration-tools-commons-tests/build.gradle
+++ b/migration-tools/modules/migration-tools-commons-tests/build.gradle
@@ -46,7 +46,11 @@ def unpackTask = 
tasks.register('unpackClassesFromDependencies', Copy) {
     }
 
     into layout.buildDirectory.dir("generated/$name")
-    include "org/apache/ignite/cache/affinity/AffinityKey.class", 
"org/apache/ignite/cache/affinity/AffinityKeyMapped.class", 
"org/apache/ignite/binary/Binarylizable.class"
+    include "org/apache/ignite/cache/affinity/AffinityKey.class",
+            "org/apache/ignite/cache/affinity/AffinityKeyMapped.class",
+            "org/apache/ignite/binary/Binarylizable.class",
+            "org/apache/ignite/cache/query/annotations/QuerySqlField.class",
+            
"org/apache/ignite/cache/query/annotations/QuerySqlField\$Group.class"
 }
 
 compileJava {
diff --git 
a/migration-tools/modules/migration-tools-commons-tests/src/main/java/org/apache/ignite/migrationtools/tests/models/InterceptingFieldsModel.java
 
b/migration-tools/modules/migration-tools-commons-tests/src/main/java/org/apache/ignite/migrationtools/tests/models/InterceptingFieldsModel.java
new file mode 100644
index 00000000000..477efeb36b8
--- /dev/null
+++ 
b/migration-tools/modules/migration-tools-commons-tests/src/main/java/org/apache/ignite/migrationtools/tests/models/InterceptingFieldsModel.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.migrationtools.tests.models;
+
+import java.util.Objects;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+
+/** Provides two models with intercepting field names. */
+public class InterceptingFieldsModel {
+    /** Key model. */
+    public static class Key {
+        private int key1;
+
+        private int key2;
+
+        public Key() {
+            // No-op
+        }
+
+        /**
+         * Constructor.
+         *
+         * @param key1 key1
+         * @param key2 key2
+         */
+        public Key(int key1, int key2) {
+            this.key1 = key1;
+            this.key2 = key2;
+        }
+
+        public int key1() {
+            return key1;
+        }
+
+        public int key2() {
+            return key2;
+        }
+    }
+
+    /** Value model. */
+    public static class Value {
+        @QuerySqlField
+        private long key1;
+
+        @QuerySqlField
+        private long key2;
+
+        @QuerySqlField
+        private String value;
+
+        public Value() {
+            // No-op.
+        }
+
+        /**
+         * Default constructor.
+         *
+         * @param key1 key1
+         * @param key2 key2
+         * @param value val
+         */
+        public Value(long key1, long key2, String value) {
+            this.key1 = key1;
+            this.key2 = key2;
+            this.value = value;
+        }
+
+        public long key1() {
+            return key1;
+        }
+
+        public long key2() {
+            return key2;
+        }
+
+        public String value() {
+            return value;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            Value value1 = (Value) o;
+            return key1 == value1.key1 && key2 == value1.key2 && 
Objects.equals(value, value1.value);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(key1, key2, value);
+        }
+    }
+}
diff --git 
a/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/sql/FieldNameConflictException.java
 
b/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/sql/FieldNameConflictException.java
index 44cc9ab0102..69dd4c9128c 100644
--- 
a/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/sql/FieldNameConflictException.java
+++ 
b/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/sql/FieldNameConflictException.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.migrationtools.sql;
 
 import java.util.Set;
+import org.apache.ignite.migrationtools.types.InspectedField;
 
 /** Defines a situation when it was not possible to assign a field name due to 
a conflict with other fields. */
 public class FieldNameConflictException extends RuntimeException {
@@ -26,7 +27,7 @@ public class FieldNameConflictException extends 
RuntimeException {
         super(message);
     }
 
-    public FieldNameConflictException(SqlDdlGenerator.InspectedField 
inspectedField, Set<String> knownFieldNames) {
+    public FieldNameConflictException(InspectedField inspectedField, 
Set<String> knownFieldNames) {
         super("Duplicated field name for:" + inspectedField + " in " + 
knownFieldNames);
     }
 
diff --git 
a/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/sql/SqlDdlGenerator.java
 
b/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/sql/SqlDdlGenerator.java
index 6aeeaa27adc..d063ef4f5fc 100644
--- 
a/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/sql/SqlDdlGenerator.java
+++ 
b/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/sql/SqlDdlGenerator.java
@@ -19,16 +19,7 @@ package org.apache.ignite.migrationtools.sql;
 
 import static java.util.function.Predicate.not;
 
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Modifier;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
 import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -38,22 +29,25 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.commons.lang3.ClassUtils;
-import org.apache.commons.lang3.reflect.FieldUtils;
-import org.apache.commons.lang3.tuple.Triple;
 import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.migrationtools.tablemanagement.TableTypeDescriptor;
 import org.apache.ignite.migrationtools.tablemanagement.TableTypeRegistry;
 import 
org.apache.ignite.migrationtools.tablemanagement.TableTypeRegistryMapImpl;
+import org.apache.ignite.migrationtools.tablemanagement.TableTypeRegistryUtils;
+import org.apache.ignite.migrationtools.types.InspectedField;
+import org.apache.ignite.migrationtools.types.InspectedFieldType;
+import org.apache.ignite.migrationtools.types.TypeInspector;
 import org.apache.ignite.migrationtools.utils.ClassnameUtils;
 import org.apache.ignite3.catalog.ColumnType;
 import org.apache.ignite3.catalog.definitions.ColumnDefinition;
 import org.apache.ignite3.catalog.definitions.TableDefinition;
 import org.apache.ignite3.internal.catalog.sql.CatalogExtensions;
-import org.apache.ignite3.table.mapper.Mapper;
 import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -279,46 +273,6 @@ public class SqlDdlGenerator {
 
     private static final int DEFAULT_BINARY_FIELD_LENGTH = 1024;
 
-    private static final Map<Class<?>, ColumnType<?>> COL_TYPE_REF;
-
-    static {
-        try {
-            COL_TYPE_REF = (Map<Class<?>, ColumnType<?>>) 
FieldUtils.readDeclaredStaticField(ColumnType.class, "TYPES", true);
-            // TODO: IGNITE-25351 Remove
-            COL_TYPE_REF.remove(java.util.Date.class);
-
-            var constructor = 
ColumnType.class.getDeclaredConstructor(Class.class, String.class);
-            constructor.setAccessible(true);
-            constructor.newInstance(Character.class, "CHAR");
-            constructor.newInstance(BitSet.class, "VARBINARY");
-            constructor.newInstance(LocalTime.class, "TIME");
-            constructor.newInstance(LocalDate.class, "DATE");
-            constructor.newInstance(LocalDateTime.class, "TIMESTAMP");
-            constructor.newInstance(Instant.class, "TIMESTAMP");
-            constructor.newInstance(java.util.Date.class, "TIMESTAMP");
-            constructor.newInstance(Enum.class, "VARCHAR");
-            // TODO: IGNITE-25351 Remove
-            constructor.newInstance(java.sql.Date.class, "DATE");
-            constructor.newInstance(java.sql.Time.class, "TIME");
-            constructor.newInstance(java.sql.Timestamp.class, "TIMESTAMP");
-            // Collections
-            constructor.newInstance(Collection.class, "VARBINARY");
-            constructor.newInstance(List.class, "VARBINARY");
-            constructor.newInstance(Set.class, "VARBINARY");
-            // Primitive Arrays
-            constructor.newInstance(boolean[].class, "VARBINARY");
-            constructor.newInstance(char[].class, "VARBINARY");
-            constructor.newInstance(short[].class, "VARBINARY");
-            constructor.newInstance(int[].class, "VARBINARY");
-            constructor.newInstance(long[].class, "VARBINARY");
-            constructor.newInstance(float[].class, "VARBINARY");
-            constructor.newInstance(double[].class, "VARBINARY");
-            constructor.newInstance(String[].class, "VARBINARY");
-        } catch (IllegalAccessException | NoSuchMethodException | 
InvocationTargetException | InstantiationException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
     private final ClassLoader clientClassLoader;
 
     private final TableTypeRegistry tableTypeRegistry;
@@ -350,54 +304,23 @@ public class SqlDdlGenerator {
         this.allowExtraFields = allowExtraFields;
     }
 
-    private static boolean isPrimitiveType(Class<?> type) {
-        return type.isEnum() || Mapper.nativelySupported(type) || 
COL_TYPE_REF.containsKey(type);
-    }
-
-    private static List<InspectedField> inspectType(Class<?> type) {
-        Class<?> rootType = ClassUtils.primitiveToWrapper(type);
-        String rootTypeName = rootType.getName();
-
-        if (rootType.isArray() || Collection.class.isAssignableFrom(rootType)) 
{
-            return Collections.singletonList(new InspectedField(null, 
rootTypeName, FieldType.ARRAY));
-        } else if (isPrimitiveType(rootType)) {
-            return Collections.singletonList(new InspectedField(null, 
rootTypeName, FieldType.PRIMITIVE));
-        } else {
-            Field[] fields = rootType.getDeclaredFields();
-            List<InspectedField> ret = new ArrayList<>(fields.length);
-            for (Field field : fields) {
-                Class<?> origFieldType = field.getType();
-                Class<?> wrappedFieldType = 
ClassUtils.primitiveToWrapper(origFieldType);
-                if (shouldPersistField(field) && 
isPrimitiveType(wrappedFieldType)) {
-                    boolean nullable = !origFieldType.isPrimitive();
-                    ret.add(new InspectedField(field.getName(), 
wrappedFieldType.getName(), FieldType.POJO_ATTRIBUTE, nullable));
-                }
-            }
-
-            return ret;
-        }
-    }
-
-    private static void addToQueryEntity(QueryEntity qe, InspectedField 
inspectedField, boolean nullable) {
+    private static void addToQueryEntity(
+            QueryEntity qe,
+            String columnName,
+            String typeName,
+            InspectedFieldType fieldType,
+            boolean nullable
+    ) {
         // Set precision
-        if (inspectedField.fieldType == FieldType.ARRAY) {
-            qe.getFieldsPrecision().putIfAbsent(inspectedField.fieldName, 
DEFAULT_BINARY_FIELD_LENGTH);
+        if (fieldType == InspectedFieldType.ARRAY) {
+            qe.getFieldsPrecision().putIfAbsent(columnName, 
DEFAULT_BINARY_FIELD_LENGTH);
         }
 
         if (!nullable) {
-            qe.getNotNullFields().add(inspectedField.fieldName);
+            qe.getNotNullFields().add(columnName);
         }
 
-        qe.getFields().put(inspectedField.fieldName, inspectedField.typeName);
-    }
-
-    private static void addToQueryEntity(QueryEntity qe, InspectedField 
inspectedField) {
-        addToQueryEntity(qe, inspectedField, inspectedField.isNullable());
-    }
-
-    private static boolean shouldPersistField(Field field) {
-        var mods = field.getModifiers();
-        return !Modifier.isStatic(mods) && !Modifier.isTransient(mods);
+        qe.getFields().put(columnName, typeName);
     }
 
     private static String sanitizeColumnName(String columnName) {
@@ -442,14 +365,14 @@ public class SqlDdlGenerator {
     private List<InspectedField> inspectTypeName(String typeName, String 
typeDescr) {
         try {
             Class<?> type = ClassUtils.getClass(this.clientClassLoader, 
typeName);
-            return inspectType(type);
+            return TypeInspector.inspectType(type);
         } catch (ClassNotFoundException e) {
             LOGGER.warn("Could not find {} class to enrich the QueryEntity: 
{}", typeDescr, typeName);
             return Collections.emptyList();
         }
     }
 
-    private QueryEntity populateQueryEntity(QueryEntity qe, boolean 
allowExtraFields) throws FieldNameConflictException {
+    private QueryEntityEvaluation populateQueryEntity(QueryEntity qe, boolean 
allowExtraFields) throws FieldNameConflictException {
         // Make sure QE has non-null maps
         {
             if (qe.getNotNullFields() == null) {
@@ -509,76 +432,128 @@ public class SqlDdlGenerator {
             qe.getNotNullFields().addAll(qe.getKeyFields());
         }
 
+        @Nullable Map<InspectedField, String> keyFieldToColumnMap;
+        @Nullable Map<InspectedField, String> valFieldToColumnMap;
+
+        Predicate<InspectedField> isNestedPojo = f -> f.fieldType() != 
InspectedFieldType.NESTED_POJO_ATTRIBUTE;
+
         // Inspect classes that are on the classpath
         List<InspectedField> keyFields = inspectTypeName(qe.getKeyType(), 
"KEY");
+        if (keyFields.isEmpty()) {
+            keyFieldToColumnMap = null;
+        } else {
+            keyFields = keyFields.stream()
+                    .filter(isNestedPojo)
+                    .collect(Collectors.toList());
+
+            keyFieldToColumnMap = new HashMap<>();
+        }
+
         List<InspectedField> valFields = inspectTypeName(qe.getValueType(), 
"VALUE");
+        if (valFields.isEmpty()) {
+            valFieldToColumnMap = null;
+        } else {
+            valFields = valFields.stream()
+                    .filter(isNestedPojo)
+                    .collect(Collectors.toList());
+
+            valFieldToColumnMap = new HashMap<>();
+        }
 
         // Check duplicated field names, and assign custom field names
         {
+            class Entry {
+                private final InspectedField inspectedField;
+                private final Supplier<Map.Entry<String, String>> 
keyFieldSupplier;
+                private final Supplier<String> fieldNameCandidateSupplier;
+                private final Map<InspectedField, String> fieldToColumnMap;
+
+                private Entry(
+                        InspectedField inspectedField,
+                        Supplier<Map.Entry<String, String>> keyFieldSupplier,
+                        Supplier<String> fieldNameCandidateSupplier,
+                        Map<InspectedField, String> fieldToColumnMap
+                ) {
+                    this.inspectedField = inspectedField;
+                    this.keyFieldSupplier = keyFieldSupplier;
+                    this.fieldNameCandidateSupplier = 
fieldNameCandidateSupplier;
+                    this.fieldToColumnMap = fieldToColumnMap;
+                }
+            }
+
             Set<String> fieldNames = new HashSet<>(qe.getFields().size() + 
keyFields.size() + valFields.size());
-            List<Map.Entry<InspectedField, Supplier<String>>> unnamedFields = 
new ArrayList<>(keyFields.size() + valFields.size());
+            List<Entry> unnamedFields = new ArrayList<>(keyFields.size() + 
valFields.size());
 
             Supplier<String> keyFieldNameCandidates = new 
FieldNameCandidateSupplier(ID_FIELD_NAME_CANDIDATES, n -> "KEY_" + n);
             Supplier<String> valFieldCandidates = new 
FieldNameCandidateSupplier(VALUE_FIELD_NAME_CANDIDATES, n -> "VAL_" + n);
-            Stream<Triple<InspectedField, Supplier<Map.Entry<String, String>>, 
Supplier<String>>> x = Stream.concat(
-                    keyFields.stream().map(f -> Triple.of(f, () -> 
keyField(qe), keyFieldNameCandidates)),
-                    valFields.stream().map(f -> Triple.of(f, () -> 
valField(qe), valFieldCandidates))
-            );
+
+            Function<InspectedField, Entry> keyMapper = f -> new Entry(f, () 
-> keyField(qe), keyFieldNameCandidates, keyFieldToColumnMap);
+            Function<InspectedField, Entry> valMapper = f -> new Entry(f, () 
-> valField(qe), valFieldCandidates, valFieldToColumnMap);
+
+            // Fields with annotations have precedence, so that they end up in 
the table with the original name.
+            Stream<Entry> x = Stream.of(
+                    
keyFields.stream().filter(InspectedField::hasAnnotation).map(keyMapper),
+                    
valFields.stream().filter(InspectedField::hasAnnotation).map(valMapper),
+                    
keyFields.stream().filter(not(InspectedField::hasAnnotation)).map(keyMapper),
+                    
valFields.stream().filter(not(InspectedField::hasAnnotation)).map(valMapper)
+            ).flatMap(s -> s);
 
             // Add fields already in the QE
             // Also add the aliases, we don't need collisions on that either.
             Stream.concat(qe.getFields().keySet().stream(), 
qe.getAliases().values().stream())
-                    .map(String::toUpperCase).forEach(fieldNames::add);
+                    .map(String::toUpperCase)
+                    .forEach(fieldNames::add);
 
-            for (Iterator<Triple<InspectedField, Supplier<Map.Entry<String, 
String>>, Supplier<String>>> it = x.iterator();
-                    it.hasNext(); ) {
-                Triple<InspectedField, Supplier<Map.Entry<String, String>>, 
Supplier<String>> entry = it.next();
-                InspectedField inspectedField = entry.getLeft();
+            for (Iterator<Entry> it = x.iterator(); it.hasNext(); ) {
+                Entry entry = it.next();
+                InspectedField inspectedField = entry.inspectedField;
 
-                if (inspectedField.fieldName != null) {
-                    String fieldNameUpperCase = 
inspectedField.fieldName.toUpperCase();
+                // TODO: May be refactored
+                @Nullable String fieldName = inspectedField.fieldName();
+                if (fieldName != null) {
+                    String fieldNameUpperCase = fieldName.toUpperCase();
                     if (!fieldNames.contains(fieldNameUpperCase)) {
                         fieldNames.add(fieldNameUpperCase);
+                        entry.fieldToColumnMap.put(inspectedField, fieldName);
                     } else {
                         // I've seen some weird cases where there was case 
mismatch between the class attr name and the qe field.
                         // To accept as the same field, both the name (without 
casing) and the field type must match.
                         Optional<Map.Entry<String, String>> 
existingEntryForField = qe.getFields().entrySet().stream()
-                                .filter(e -> 
e.getKey().equalsIgnoreCase(inspectedField.fieldName)
-                                        && 
e.getValue().equals(inspectedField.typeName))
+                                .filter(e -> 
e.getKey().equalsIgnoreCase(fieldName)
+                                        && 
e.getValue().equals(inspectedField.typeName()))
                                 .findFirst();
 
                         if (existingEntryForField.isPresent()) {
                             // We will switch our inspected field name to 
match the casing in the QE and hope for the best.
-                            inspectedField.fieldName = 
existingEntryForField.get().getKey();
+                            entry.fieldToColumnMap.put(inspectedField, 
existingEntryForField.get().getKey());
                         } else {
-                            throw new 
FieldNameConflictException(inspectedField, fieldNames);
+                            unnamedFields.add(entry);
                         }
                     }
-                } else if (inspectedField.fieldType == FieldType.PRIMITIVE) {
-                    @Nullable var field = entry.getMiddle().get();
+                } else if (inspectedField.fieldType() == 
InspectedFieldType.PRIMITIVE) {
+                    @Nullable var field = entry.keyFieldSupplier.get();
                     if (field == null) {
-                        unnamedFields.add(Map.entry(inspectedField, 
entry.getRight()));
-                    } else if 
(inspectedField.typeName.equals(field.getValue())) {
-                        inspectedField.fieldName = field.getKey();
+                        unnamedFields.add(entry);
+                    } else if 
(inspectedField.typeName().equals(field.getValue())) {
+                        entry.fieldToColumnMap.put(inspectedField, 
field.getKey());
                     } else {
-                        throw 
FieldNameConflictException.forSpecificField(inspectedField.fieldName, 
inspectedField.typeName,
-                                field.getValue());
+                        throw 
FieldNameConflictException.forSpecificField(fieldName, 
inspectedField.typeName(), field.getValue());
                     }
                 } else {
-                    unnamedFields.add(Map.entry(inspectedField, 
entry.getRight()));
+                    unnamedFields.add(entry);
                 }
             }
 
             // Assign custom field names
-            for (Map.Entry<InspectedField, Supplier<String>> unnamedFieldEntry 
: unnamedFields) {
+            for (Entry unnamedEntry : unnamedFields) {
                 // Get a valid candidate for the field.
-                String fieldName = 
Stream.generate(unnamedFieldEntry.getValue())
+                String fieldName = 
Stream.generate(unnamedEntry.fieldNameCandidateSupplier)
                         .filter(not(fieldNames::contains))
                         .findFirst()
                         .get();
 
                 fieldNames.add(fieldName);
-                unnamedFieldEntry.getKey().fieldName = fieldName;
+                unnamedEntry.fieldToColumnMap.put(unnamedEntry.inspectedField, 
fieldName);
             }
         }
 
@@ -589,28 +564,38 @@ public class SqlDdlGenerator {
         {
             // Set keyFieldName if there is only one key field.
             if (keyFields.size() == 1) {
-                qe.setKeyFieldName(keyFields.get(0).fieldName);
-                qe.setKeyType(keyFields.get(0).typeName);
+                InspectedField inspectedField = keyFields.get(0);
+                String columnName = keyFieldToColumnMap.get(inspectedField);
+
+                qe.setKeyFieldName(columnName);
+                qe.setKeyType(inspectedField.typeName());
             }
 
             for (InspectedField inspectedField : keyFields) {
-                qe.getKeyFields().add(inspectedField.fieldName);
+                String columnName = keyFieldToColumnMap.get(inspectedField);
+
+                qe.getKeyFields().add(columnName);
 
-                addToQueryEntity(qe, inspectedField, false);
-                mapsPojo = mapsPojo || inspectedField.fieldType == 
FieldType.POJO_ATTRIBUTE;
+                addToQueryEntity(qe, columnName, inspectedField.typeName(), 
inspectedField.fieldType(), false);
+                mapsPojo = mapsPojo || inspectedField.fieldType() == 
InspectedFieldType.POJO_ATTRIBUTE;
             }
         }
 
         // Process value fields
         {
             if (valFields.size() == 1) {
-                qe.setValueFieldName(valFields.get(0).fieldName);
-                qe.setValueType(valFields.get(0).typeName);
+                InspectedField inspectedField = valFields.get(0);
+                String columnName = valFieldToColumnMap.get(inspectedField);
+
+                qe.setValueFieldName(columnName);
+                qe.setValueType(inspectedField.typeName());
             }
 
             for (InspectedField inspectedField : valFields) {
-                addToQueryEntity(qe, inspectedField);
-                mapsPojo = mapsPojo || inspectedField.fieldType == 
FieldType.POJO_ATTRIBUTE;
+                String columnName = valFieldToColumnMap.get(inspectedField);
+
+                addToQueryEntity(qe, columnName, inspectedField.typeName(), 
inspectedField.fieldType(), inspectedField.nullable());
+                mapsPojo = mapsPojo || inspectedField.fieldType() == 
InspectedFieldType.POJO_ATTRIBUTE;
             }
         }
 
@@ -620,7 +605,7 @@ public class SqlDdlGenerator {
             qe.getFields().put(EXTRA_FIELDS_COLUMN_NAME, 
byte[].class.getName());
         }
 
-        return qe;
+        return new QueryEntityEvaluation(qe, keyFieldToColumnMap, 
valFieldToColumnMap);
     }
 
     /**
@@ -635,13 +620,13 @@ public class SqlDdlGenerator {
         String tableName = "\"" + cacheCfg.getName() + "\"";
         // TODO: check if tableName needs quoting.
 
-        QueryEntity qryEntity = getOrCreateQueryEntity(cacheCfg);
+        QueryEntityEvaluation queryEntityEvaluation = 
getOrCreateQueryEntity(cacheCfg);
+        QueryEntity qryEntity = queryEntityEvaluation.queryEntity;
 
         int defIdx = 0;
         int pkIdx = 0;
         String[] pkColumnNames = new String[qryEntity.getKeyFields().size()];
         ColumnDefinition[] colDefinitions = new 
ColumnDefinition[qryEntity.getFields().size()];
-        Map<String, String> fieldNameForColumnMappings = new 
HashMap<>(qryEntity.getFields().size());
         for (Map.Entry<String, String> entry : 
qryEntity.getFields().entrySet()) {
             String fieldName = entry.getKey();
             Class<?> klass;
@@ -668,19 +653,42 @@ public class SqlDdlGenerator {
             if (qryEntity.getKeyFields().contains(fieldName)) {
                 pkColumnNames[pkIdx++] = columnName;
             }
-
-            fieldNameForColumnMappings.put(columnName, fieldName);
         }
 
+        // Create fieldName for column mappings
+        Function<@Nullable Map<InspectedField, String>, @Nullable Map<String, 
String>> processFieldToColumnMap = map -> {
+            if (map == null) {
+                return null;
+            }
+
+            Map<String, String> ret = new HashMap<>(map.size());
+            for (Map.Entry<InspectedField, String> e : map.entrySet()) {
+                @Nullable String fieldName = e.getKey().fieldName();
+                // Should probably check against the field type.
+                if (fieldName != null) {
+                    String columnName = e.getValue();
+                    // Process QE Aliases.
+                    columnName = 
qryEntity.getAliases().getOrDefault(columnName, columnName);
+                    ret.put(columnName, fieldName);
+                }
+            }
+
+            return ret;
+        };
+
+        @Nullable Map<String, String> keyFieldForColumn = 
processFieldToColumnMap.apply(queryEntityEvaluation.keyInspectedFieldMap);
+        @Nullable Map<String, String> valFieldForColumn = 
processFieldToColumnMap.apply(queryEntityEvaluation.valInspectedFieldMap);
+
         var table = TableDefinition.builder(tableName)
                 .schema(schema)
                 .columns(colDefinitions)
                 .primaryKey(pkColumnNames)
                 .build();
 
-        // TODO: Test one of these are null;
-        @Nullable Map.Entry<String, String> typeHints = 
Map.entry(qryEntity.getKeyType(), qryEntity.getValueType());
-        return new GenerateTableResult(table, fieldNameForColumnMappings, 
typeHints);
+        return new GenerateTableResult(
+                table,
+                new TableTypeDescriptor(qryEntity.getKeyType(), 
qryEntity.getValueType(), keyFieldForColumn, valFieldForColumn)
+        );
     }
 
     public TableDefinition generateTableDefinition(CacheConfiguration<?, ?> 
cacheCfg) throws FieldNameConflictException {
@@ -689,13 +697,16 @@ public class SqlDdlGenerator {
 
     // TODO: https://issues.apache.org/jira/browse/IGNITE-26177
     @SuppressWarnings("PMD.UnnecessaryCast")
-    private QueryEntity getOrCreateQueryEntity(CacheConfiguration cacheCfg) 
throws FieldNameConflictException {
+    private QueryEntityEvaluation getOrCreateQueryEntity(CacheConfiguration 
cacheCfg) throws FieldNameConflictException {
         // TODO: Map the whole object and key instead of the query entities
         QueryEntity qe;
 
         Map.Entry<Class<?>, Class<?>> typeHints = null;
         try {
-            typeHints = 
this.tableTypeRegistry.typesForTable(cacheCfg.getName());
+            @Nullable TableTypeDescriptor tableDescriptor = 
this.tableTypeRegistry.typesForTable(cacheCfg.getName());
+            if (tableDescriptor != null) {
+                typeHints = 
TableTypeRegistryUtils.typesToEntry(tableDescriptor);
+            }
         } catch (ClassNotFoundException ex) {
             LOGGER.error("Found TableTypeHint for cache but one of the class 
was not in the Classpath: {}", cacheCfg.getName(), ex);
         }
@@ -728,94 +739,66 @@ public class SqlDdlGenerator {
         return populateQueryEntity(qe, allowExtraFields);
     }
 
-    enum FieldType {
-        PRIMITIVE,
-        ARRAY,
-        POJO_ATTRIBUTE
-    }
-
-    static class InspectedField {
-        @Nullable
-        private String fieldName;
-
-        private String typeName;
-
-        private FieldType fieldType;
-
-        private boolean nullable;
-
-        public InspectedField(@Nullable String fieldName, String typeName, 
FieldType fieldType) {
-            this(fieldName, typeName, fieldType, !(fieldType == 
FieldType.PRIMITIVE || fieldType == FieldType.ARRAY));
-        }
-
-        public InspectedField(@Nullable String fieldName, String typeName, 
FieldType fieldType, boolean nullable) {
-            this.fieldName = fieldName;
-            this.typeName = typeName;
-            this.fieldType = fieldType;
-            this.nullable = nullable;
-        }
-
-        public String getFieldName() {
-            return fieldName;
-        }
-
-        public String getTypeName() {
-            return typeName;
-        }
-
-        public FieldType getFieldType() {
-            return fieldType;
-        }
-
-        public boolean isNullable() {
-            return nullable;
-        }
-
-        @Override
-        public String toString() {
-            return "InspectedField{"
-                    + "fieldName='" + fieldName + '\''
-                    + ", typeName='" + typeName + '\''
-                    + ", fieldType=" + fieldType
-                    + ", nullable=" + nullable
-                    + '}';
-        }
-    }
-
     /** GenerateTableResult. */
     public static class GenerateTableResult {
         private final TableDefinition tableDefinition;
 
-        private final Map<String, String> fieldNameForColumn;
-
-        @Nullable
-        private final Map.Entry<String, String> rawTypeHints;
+        private final TableTypeDescriptor tableTypeDescriptor;
 
         /**
          * Constructor.
          *
          * @param tableDefinition Table definition.
-         * @param fieldNameForColumn Mapping of columns to their corresponding 
field names.
-         * @param rawTypeHints Mapping of type hints by column.
+         * @param tableTypeDescriptor Description of the types for the table.
          */
-        public GenerateTableResult(TableDefinition tableDefinition, 
Map<String, String> fieldNameForColumn,
-                Map.Entry<String, String> rawTypeHints) {
+        public GenerateTableResult(
+                TableDefinition tableDefinition,
+                TableTypeDescriptor tableTypeDescriptor
+        ) {
             this.tableDefinition = tableDefinition;
-            this.fieldNameForColumn = fieldNameForColumn;
-            this.rawTypeHints = rawTypeHints;
+            this.tableTypeDescriptor = tableTypeDescriptor;
         }
 
         public TableDefinition tableDefinition() {
             return tableDefinition;
         }
 
-        public Map<String, String> fieldNameForColumnMappings() {
-            return fieldNameForColumn;
+        public TableTypeDescriptor tableTypeDescriptor() {
+            return tableTypeDescriptor;
         }
 
-        @Nullable
         public Map.Entry<String, String> typeHints() {
-            return rawTypeHints;
+            return tableTypeDescriptor.typeHints();
+        }
+
+        /** Combines FieldNamesForColumn mappings for keys and values into the 
same map. */
+        public Map<String, String> fieldToColumnMappings() {
+            Map<String, String> ret = new HashMap<>();
+
+            
Optional.ofNullable(tableTypeDescriptor.keyFieldNameForColumn()).ifPresent(ret::putAll);
+            
Optional.ofNullable(tableTypeDescriptor.valFieldNameForColumn()).ifPresent(ret::putAll);
+
+            return ret;
+        }
+    }
+
+    static class QueryEntityEvaluation {
+        private final QueryEntity queryEntity;
+
+        @Nullable
+        private final Map<InspectedField, String> keyInspectedFieldMap;
+
+        @Nullable
+        private final Map<InspectedField, String> valInspectedFieldMap;
+
+        QueryEntityEvaluation(
+                QueryEntity queryEntity,
+                @Nullable Map<InspectedField, String> keyInspectedFieldMap,
+                @Nullable Map<InspectedField, String> valInspectedFieldMap
+        ) {
+            this.queryEntity = queryEntity;
+            this.keyInspectedFieldMap = keyInspectedFieldMap;
+            this.valInspectedFieldMap = valInspectedFieldMap;
         }
     }
 
diff --git 
a/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/tablemanagement/PersistentTableTypeRegistryImpl.java
 
b/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/tablemanagement/PersistentTableTypeRegistryImpl.java
index 430f92aa876..a2ba20a3f5f 100644
--- 
a/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/tablemanagement/PersistentTableTypeRegistryImpl.java
+++ 
b/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/tablemanagement/PersistentTableTypeRegistryImpl.java
@@ -19,6 +19,8 @@ package org.apache.ignite.migrationtools.tablemanagement;
 
 import static org.apache.ignite3.catalog.definitions.ColumnDefinition.column;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite3.catalog.ColumnType;
@@ -36,6 +38,8 @@ public class PersistentTableTypeRegistryImpl implements 
TableTypeRegistry {
 
     private final CompletableFuture<KeyValueView<String, TableTypeRecord>> 
tableFuture;
 
+    private final ObjectMapper jsonObjectMapper = new ObjectMapper();
+
     public PersistentTableTypeRegistryImpl(IgniteClient client) {
         this.client = client;
         this.tableFuture = initTable().thenApply(table -> 
table.keyValueView(String.class, TableTypeRecord.class));
@@ -49,7 +53,9 @@ public class PersistentTableTypeRegistryImpl implements 
TableTypeRegistry {
                 .columns(
                         column("TABLE_KEY", ColumnType.VARCHAR),
                         column("keyClass", ColumnType.VARCHAR),
-                        column("valClass", ColumnType.VARCHAR))
+                        column("valClass", ColumnType.VARCHAR),
+                        column("keyColumnMappings", 
ColumnType.VARCHAR.nullable(true)),
+                        column("valColumnMappings", 
ColumnType.VARCHAR.nullable(true)))
                 .primaryKey("TABLE_KEY")
                 .build();
 
@@ -57,24 +63,53 @@ public class PersistentTableTypeRegistryImpl implements 
TableTypeRegistry {
     }
 
     @Override
-    public @Nullable Map.Entry<Class<?>, Class<?>> typesForTable(String 
tableName) throws ClassNotFoundException {
+    public @Nullable TableTypeDescriptor typesForTable(String tableName) {
         var table = this.tableFuture.join();
-        var types = table.get(null, tableName);
-
+        @Nullable TableTypeRecord types = table.get(null, tableName);
         if (types == null) {
             return null;
-        } else {
-            var keyClass = Class.forName(types.keyClass);
-            var valClass = Class.forName(types.valClass);
-            return Map.entry(keyClass, valClass);
+        }
+
+        try {
+            Map<String, String> keyColumnMappings = (types.keyColumnMappings 
!= null)
+                    ? this.jsonObjectMapper.readValue(types.keyColumnMappings, 
Map.class)
+                    : null;
+
+            Map<String, String> valColumnMappings = (types.valColumnMappings 
!= null)
+                    ? this.jsonObjectMapper.readValue(types.valColumnMappings, 
Map.class)
+                    : null;
+
+            return new TableTypeDescriptor(types.keyClass, types.valClass, 
keyColumnMappings, valColumnMappings);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
         }
     }
 
     @Override
-    public void registerTypesForTable(String tableName, Map.Entry<String, 
String> tableTypes) {
+    public void registerTypesForTable(String tableName, TableTypeDescriptor 
tableDescriptor) {
         var table = this.tableFuture.join();
-        var record = new TableTypeRecord(tableTypes.getKey(), 
tableTypes.getValue());
-        table.put(null, tableName, record);
+
+        try {
+            String keyColumnMappingsAsJson = 
(tableDescriptor.keyFieldNameForColumn() != null)
+                    ? 
jsonObjectMapper.writeValueAsString(tableDescriptor.keyFieldNameForColumn())
+                    : null;
+
+            String valColumnMappingsAsJson = 
(tableDescriptor.valFieldNameForColumn() != null)
+                    ? 
jsonObjectMapper.writeValueAsString(tableDescriptor.valFieldNameForColumn())
+                    : null;
+
+            var record = new TableTypeRecord(
+                    tableDescriptor.keyClassName(),
+                    tableDescriptor.valClassName(),
+                    keyColumnMappingsAsJson,
+                    valColumnMappingsAsJson
+            );
+
+            table.put(null, tableName, record);
+        } catch (JsonProcessingException e) {
+            // TODO: Figure out what to do next.
+            throw new RuntimeException(e);
+        }
     }
 
     private static class TableTypeRecord {
@@ -82,13 +117,21 @@ public class PersistentTableTypeRegistryImpl implements 
TableTypeRegistry {
 
         private String valClass;
 
+        @Nullable
+        private String keyColumnMappings;
+
+        @Nullable
+        private String valColumnMappings;
+
         public TableTypeRecord() {
             // Intentionally left blank
         }
 
-        public TableTypeRecord(String keyClass, String valClass) {
+        public TableTypeRecord(String keyClass, String valClass, @Nullable 
String keyColumnMappings, @Nullable String valColumnMappings) {
             this.keyClass = keyClass;
             this.valClass = valClass;
+            this.keyColumnMappings = keyColumnMappings;
+            this.valColumnMappings = valColumnMappings;
         }
     }
 }
diff --git 
a/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/tablemanagement/RegisterOnlyTableTypeRegistry.java
 
b/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/tablemanagement/RegisterOnlyTableTypeRegistry.java
index 063a55f406f..d348b65589b 100644
--- 
a/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/tablemanagement/RegisterOnlyTableTypeRegistry.java
+++ 
b/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/tablemanagement/RegisterOnlyTableTypeRegistry.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.migrationtools.tablemanagement;
 
-import java.util.Map;
 import org.jetbrains.annotations.Nullable;
 
 /** Decorator for {@link TableTypeRegistry} that only registers new hints. */
@@ -28,14 +27,14 @@ public class RegisterOnlyTableTypeRegistry implements 
TableTypeRegistry {
         this.base = base;
     }
 
-    @Override
     @Nullable
-    public Map.Entry<Class<?>, Class<?>> typesForTable(String tableName) 
throws ClassNotFoundException {
+    @Override
+    public TableTypeDescriptor typesForTable(String tableName) {
         return null;
     }
 
     @Override
-    public void registerTypesForTable(String tableName, Map.Entry<String, 
String> tableTypes) {
-        base.registerTypesForTable(tableName, tableTypes);
+    public void registerTypesForTable(String tableName, TableTypeDescriptor 
tableDescriptor) {
+        base.registerTypesForTable(tableName, tableDescriptor);
     }
 }
diff --git 
a/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/tablemanagement/TableTypeDescriptor.java
 
b/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/tablemanagement/TableTypeDescriptor.java
new file mode 100644
index 00000000000..de191444360
--- /dev/null
+++ 
b/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/tablemanagement/TableTypeDescriptor.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.migrationtools.tablemanagement;
+
+import java.util.Map;
+import org.jetbrains.annotations.Nullable;
+
+/** Describes the key and value types of a Table. */
+public class TableTypeDescriptor {
+    private final String keyClassName;
+
+    private final String valClassName;
+
+    @Nullable
+    private final Map<String, String> keyFieldNameForColumn;
+
+    @Nullable
+    private final Map<String, String> valFieldNameForColumn;
+
+    /**
+     * Constructor.
+     *
+     * @param keyClassName Name of the key type.
+     * @param valClassName Name of the value type.
+     * @param keyFieldNameForColumn Mapping of key columns to their 
corresponding field names.
+     *      May be null if the key class was not available.
+     * @param valFieldNameForColumn Mapping of value columns to their 
corresponding field names.
+     *      May be null if the key class was not available.
+     */
+    public TableTypeDescriptor(
+            String keyClassName,
+            String valClassName,
+            @Nullable Map<String, String> keyFieldNameForColumn,
+            @Nullable Map<String, String> valFieldNameForColumn
+    ) {
+        this.keyClassName = keyClassName;
+        this.valClassName = valClassName;
+        this.keyFieldNameForColumn = keyFieldNameForColumn;
+        this.valFieldNameForColumn = valFieldNameForColumn;
+    }
+
+    public String keyClassName() {
+        return keyClassName;
+    }
+
+    public String valClassName() {
+        return valClassName;
+    }
+
+    public Map.Entry<String, String> typeHints() {
+        return Map.entry(keyClassName, valClassName);
+    }
+
+    @Nullable
+    public Map<String, String> keyFieldNameForColumn() {
+        return keyFieldNameForColumn;
+    }
+
+    @Nullable
+    public Map<String, String> valFieldNameForColumn() {
+        return valFieldNameForColumn;
+    }
+}
diff --git 
a/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/tablemanagement/TableTypeRegistry.java
 
b/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/tablemanagement/TableTypeRegistry.java
index b9fb83484d5..a72a924b041 100644
--- 
a/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/tablemanagement/TableTypeRegistry.java
+++ 
b/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/tablemanagement/TableTypeRegistry.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.migrationtools.tablemanagement;
 
-import java.util.Map;
 import org.jetbrains.annotations.Nullable;
 
 /** This interface provides a registry for mappings between tables and their 
corresponding java types. */
@@ -28,17 +27,13 @@ public interface TableTypeRegistry {
      * @param tableName Must be escaped according to Ignite 3 rules.
      * @return The type hints for the table, or null if none are available.
      */
-    @Nullable Map.Entry<Class<?>, Class<?>> typesForTable(String tableName) 
throws ClassNotFoundException;
+    @Nullable TableTypeDescriptor typesForTable(String tableName);
 
     /**
      * Registers the supplied type hints for the given table. Existing hints 
will be replaced.
      *
      * @param tableName Must be escaped according to Ignite 3 rules.
-     * @param tableTypes Entry with the ClassNames of Key and Value types, as 
returned by {@link Class#getName()}.
+     * @param tableDescriptor Table Descriptor.
      */
-    void registerTypesForTable(String tableName, Map.Entry<String, String> 
tableTypes);
-
-    static Map.Entry<String, String> typeHints(Class<?> keyType, Class<?> 
valType) {
-        return Map.entry(keyType.getName(), valType.getName());
-    }
+    void registerTypesForTable(String tableName, TableTypeDescriptor 
tableDescriptor);
 }
diff --git 
a/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/tablemanagement/TableTypeRegistryMapImpl.java
 
b/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/tablemanagement/TableTypeRegistryMapImpl.java
index 109e3f1cf8f..995936467e9 100644
--- 
a/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/tablemanagement/TableTypeRegistryMapImpl.java
+++ 
b/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/tablemanagement/TableTypeRegistryMapImpl.java
@@ -23,24 +23,15 @@ import org.jetbrains.annotations.Nullable;
 
 /** {@link TableTypeRegistry} implementation based on a in-memory map. */
 public class TableTypeRegistryMapImpl implements TableTypeRegistry {
-    private final Map<String, Map.Entry<String, String>> hints = new 
HashMap<>();
+    private final Map<String, TableTypeDescriptor> hints = new HashMap<>();
 
     @Override
-    public @Nullable Map.Entry<Class<?>, Class<?>> typesForTable(String 
tableName) {
-        var typeNames = hints.get(tableName);
-        if (typeNames == null) {
-            return null;
-        }
-
-        try {
-            return Map.entry(Class.forName(typeNames.getKey()), 
Class.forName(typeNames.getValue()));
-        } catch (ClassNotFoundException e) {
-            throw new RuntimeException(e);
-        }
+    public @Nullable TableTypeDescriptor typesForTable(String tableName) {
+        return hints.get(tableName);
     }
 
     @Override
-    public void registerTypesForTable(String tableName, Map.Entry<String, 
String> tableTypes) {
-        this.hints.putIfAbsent(tableName, tableTypes);
+    public void registerTypesForTable(String tableName, TableTypeDescriptor 
tableDescriptor) {
+        this.hints.putIfAbsent(tableName, tableDescriptor);
     }
 }
diff --git 
a/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/tablemanagement/TableTypeRegistryUtils.java
 
b/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/tablemanagement/TableTypeRegistryUtils.java
new file mode 100644
index 00000000000..b7f2461cf74
--- /dev/null
+++ 
b/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/tablemanagement/TableTypeRegistryUtils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.migrationtools.tablemanagement;
+
+import java.util.Map;
+
+/** Utility class for handling TableTypeRegistries. */
+public class TableTypeRegistryUtils {
+    /**
+     * Materializes the table types into a key/value pair.
+     *
+     * @param descriptor Table Type descriptor.
+     * @return Materialized key/value pair of the table types.
+     * @throws ClassNotFoundException if one of the types is not available on 
the classpath.
+     */
+    public static Map.Entry<Class<?>, Class<?>> 
typesToEntry(TableTypeDescriptor descriptor) throws ClassNotFoundException {
+        return Map.entry(
+                Class.forName(descriptor.keyClassName()),
+                Class.forName(descriptor.valClassName())
+        );
+    }
+
+    /**
+     * Creates a table descriptor with just the supplied type hints.
+     *
+     * @param keyClass Type of the key.
+     * @param valClass Type of the value.
+     * @return the new TableTypeDescriptor instance.
+     */
+    public static TableTypeDescriptor typeHints(Class<?> keyClass, Class<?> 
valClass) {
+        return new TableTypeDescriptor(keyClass.getName(), valClass.getName(), 
null, null);
+    }
+}
diff --git 
a/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/types/InspectedField.java
 
b/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/types/InspectedField.java
new file mode 100644
index 00000000000..ad9afa4b13e
--- /dev/null
+++ 
b/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/types/InspectedField.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.migrationtools.types;
+
+import java.util.Objects;
+import org.jetbrains.annotations.Nullable;
+
+/** Holds information about how a field should be persisted. */
+public class InspectedField {
+    @Nullable
+    private final String fieldName;
+
+    private final String typeName;
+
+    private final InspectedFieldType fieldType;
+
+    private final boolean nullable;
+
+    private final boolean hasAnnotation;
+
+    /**
+     * Default constructor.
+     *
+     * @param fieldName Name of the field. May be null for Primitive and Array 
fields.
+     * @param typeName Name of the field class, usually from {@link 
Class#getName()}
+     * @param fieldType Field type.
+     * @param nullable Whether the field is nullable or not.
+     * @param hasAnnotation Whether the field was annotated with a 
'QuerySqlField'.
+     */
+    private InspectedField(
+            @Nullable String fieldName,
+            String typeName,
+            InspectedFieldType fieldType,
+            boolean nullable,
+            boolean hasAnnotation
+    ) {
+        this.fieldName = fieldName;
+        this.typeName = typeName;
+        this.fieldType = fieldType;
+        this.nullable = nullable;
+        this.hasAnnotation = hasAnnotation;
+    }
+
+    /**
+     * Factory method for primitive and array fields.
+     *
+     * @param typeName Type name.
+     * @param fieldType Field type.
+     * @return The new InspectedField instance.
+     */
+    public static InspectedField forUnnamed(String typeName, 
InspectedFieldType fieldType) {
+        if (fieldType != InspectedFieldType.PRIMITIVE && fieldType != 
InspectedFieldType.ARRAY) {
+            throw new IllegalArgumentException("'fieldType' must be PRIMITIVE 
or ARRAY");
+        }
+
+        return new InspectedField(
+                null,
+                typeName,
+                fieldType,
+                false,
+                false
+        );
+    }
+
+    /**
+     * Factory method for named fields (non-primitive and non-array).
+     *
+     * @param fieldName Name of the field. May be null for Primitive and Array 
fields.
+     * @param typeName Name of the field class, usually from {@link 
Class#getName()}
+     * @param fieldType Field type.
+     * @param nullable Whether the field is nullable or not.
+     * @param hasAnnotation Whether the field was annotated with a 
'QuerySqlField'.
+     * @return The new InspectedField instance.
+     */
+    public static InspectedField forNamed(
+            String fieldName,
+            String typeName,
+            InspectedFieldType fieldType,
+            boolean nullable,
+            boolean hasAnnotation
+    ) {
+        if (fieldType == InspectedFieldType.PRIMITIVE || fieldType == 
InspectedFieldType.ARRAY) {
+            throw new IllegalArgumentException("'fieldType' must not be 
PRIMITIVE or ARRAY");
+        }
+
+        return new InspectedField(
+                fieldName,
+                typeName,
+                fieldType,
+                nullable,
+                hasAnnotation
+        );
+    }
+
+    @Nullable
+    public String fieldName() {
+        return fieldName;
+    }
+
+    public String typeName() {
+        return typeName;
+    }
+
+    public InspectedFieldType fieldType() {
+        return fieldType;
+    }
+
+    public boolean nullable() {
+        return nullable;
+    }
+
+    public boolean hasAnnotation() {
+        return hasAnnotation;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        InspectedField that = (InspectedField) o;
+        return nullable == that.nullable && hasAnnotation == 
that.hasAnnotation && Objects.equals(fieldName, that.fieldName)
+                && Objects.equals(typeName, that.typeName) && fieldType == 
that.fieldType;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(fieldName, typeName, fieldType, nullable, 
hasAnnotation);
+    }
+
+    @Override
+    public String toString() {
+        return "InspectedField{"
+                + "fieldName='" + fieldName + '\''
+                + ", typeName='" + typeName + '\''
+                + ", fieldType=" + fieldType
+                + ", nullable=" + nullable
+                + ", hasAnnotation=" + hasAnnotation
+                + '}';
+    }
+}
diff --git 
a/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/tablemanagement/RegisterOnlyTableTypeRegistry.java
 
b/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/types/InspectedFieldType.java
similarity index 51%
copy from 
migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/tablemanagement/RegisterOnlyTableTypeRegistry.java
copy to 
migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/types/InspectedFieldType.java
index 063a55f406f..4ca536a6aff 100644
--- 
a/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/tablemanagement/RegisterOnlyTableTypeRegistry.java
+++ 
b/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/types/InspectedFieldType.java
@@ -15,27 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.migrationtools.tablemanagement;
+package org.apache.ignite.migrationtools.types;
 
-import java.util.Map;
-import org.jetbrains.annotations.Nullable;
-
-/** Decorator for {@link TableTypeRegistry} that only registers new hints. */
-public class RegisterOnlyTableTypeRegistry implements TableTypeRegistry {
-    private final TableTypeRegistry base;
-
-    public RegisterOnlyTableTypeRegistry(TableTypeRegistry base) {
-        this.base = base;
-    }
-
-    @Override
-    @Nullable
-    public Map.Entry<Class<?>, Class<?>> typesForTable(String tableName) 
throws ClassNotFoundException {
-        return null;
-    }
-
-    @Override
-    public void registerTypesForTable(String tableName, Map.Entry<String, 
String> tableTypes) {
-        base.registerTypesForTable(tableName, tableTypes);
-    }
+/** InspectedFieldTypeEnum. */
+public enum InspectedFieldType {
+    PRIMITIVE,
+    ARRAY,
+    POJO_ATTRIBUTE,
+    NESTED_POJO_ATTRIBUTE,
 }
diff --git 
a/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/types/TypeInspector.java
 
b/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/types/TypeInspector.java
new file mode 100644
index 00000000000..770ab7cd867
--- /dev/null
+++ 
b/migration-tools/modules/migration-tools-commons/src/main/java/org/apache/ignite/migrationtools/types/TypeInspector.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.migrationtools.types;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Modifier;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite3.catalog.ColumnType;
+import org.apache.ignite3.table.mapper.Mapper;
+import org.jetbrains.annotations.Nullable;
+
+/** Utility class that provides methods to identify relevant fields in 
classes. */
+public class TypeInspector {
+    private static final Map<Class<?>, ColumnType<?>> COL_TYPE_REF;
+
+    static {
+        try {
+            COL_TYPE_REF = (Map<Class<?>, ColumnType<?>>) 
FieldUtils.readDeclaredStaticField(ColumnType.class, "TYPES", true);
+            // TODO: IGNITE-25351 Remove
+            COL_TYPE_REF.remove(java.util.Date.class);
+
+            var constructor = 
ColumnType.class.getDeclaredConstructor(Class.class, String.class);
+            constructor.setAccessible(true);
+            constructor.newInstance(Character.class, "CHAR");
+            constructor.newInstance(BitSet.class, "VARBINARY");
+            constructor.newInstance(LocalTime.class, "TIME");
+            constructor.newInstance(LocalDate.class, "DATE");
+            constructor.newInstance(LocalDateTime.class, "TIMESTAMP");
+            constructor.newInstance(Instant.class, "TIMESTAMP");
+            constructor.newInstance(java.util.Date.class, "TIMESTAMP");
+            constructor.newInstance(Enum.class, "VARCHAR");
+            // TODO: IGNITE-25351 Remove
+            constructor.newInstance(java.sql.Date.class, "DATE");
+            constructor.newInstance(java.sql.Time.class, "TIME");
+            constructor.newInstance(java.sql.Timestamp.class, "TIMESTAMP");
+            // Collections
+            constructor.newInstance(Collection.class, "VARBINARY");
+            constructor.newInstance(List.class, "VARBINARY");
+            constructor.newInstance(Set.class, "VARBINARY");
+            // Primitive Arrays
+            constructor.newInstance(boolean[].class, "VARBINARY");
+            constructor.newInstance(char[].class, "VARBINARY");
+            constructor.newInstance(short[].class, "VARBINARY");
+            constructor.newInstance(int[].class, "VARBINARY");
+            constructor.newInstance(long[].class, "VARBINARY");
+            constructor.newInstance(float[].class, "VARBINARY");
+            constructor.newInstance(double[].class, "VARBINARY");
+            constructor.newInstance(String[].class, "VARBINARY");
+        } catch (IllegalAccessException | NoSuchMethodException | 
InvocationTargetException | InstantiationException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Inspects the given class, extracting information about how its fields 
should be persisted.
+     *
+     * @param type Class to be inspected.
+     * @return List of the inspected fields.
+     */
+    public static List<InspectedField> inspectType(Class<?> type) {
+        Class<?> rootType = ClassUtils.primitiveToWrapper(type);
+        String rootTypeName = rootType.getName();
+
+        if (rootType.isArray() || Collection.class.isAssignableFrom(rootType)) 
{
+            return 
Collections.singletonList(InspectedField.forUnnamed(rootTypeName, 
InspectedFieldType.ARRAY));
+        } else if (isPrimitiveType(rootType)) {
+            return 
Collections.singletonList(InspectedField.forUnnamed(rootTypeName, 
InspectedFieldType.PRIMITIVE));
+        } else {
+            Field[] fields = rootType.getDeclaredFields();
+            List<InspectedField> ret = new ArrayList<>(fields.length);
+            for (Field field : fields) {
+                if (shouldPersistField(field)) {
+                    @Nullable QuerySqlField annotation = 
field.getAnnotation(QuerySqlField.class);
+                    boolean hasAnnotation = annotation != null;
+
+                    Class<?> origFieldType = field.getType();
+                    Class<?> wrappedFieldType = 
ClassUtils.primitiveToWrapper(origFieldType);
+
+                    boolean nullable = !origFieldType.isPrimitive();
+
+                    InspectedFieldType inspectedFieldType = 
isPrimitiveType(wrappedFieldType)
+                            ? InspectedFieldType.POJO_ATTRIBUTE
+                            : InspectedFieldType.NESTED_POJO_ATTRIBUTE;
+
+                    InspectedField inspectedField = InspectedField.forNamed(
+                            field.getName(),
+                            wrappedFieldType.getName(),
+                            inspectedFieldType,
+                            nullable,
+                            hasAnnotation
+                    );
+
+                    ret.add(inspectedField);
+                }
+            }
+
+            return ret;
+        }
+    }
+
+    private static boolean isPrimitiveType(Class<?> type) {
+        return type.isEnum() || Mapper.nativelySupported(type) || 
COL_TYPE_REF.containsKey(type);
+    }
+
+    private static boolean shouldPersistField(Field field) {
+        var mods = field.getModifiers();
+        return !Modifier.isStatic(mods) && !Modifier.isTransient(mods);
+    }
+}
diff --git 
a/migration-tools/modules/migration-tools-commons/src/test/java/org/apache/ignite/migrationtools/sql/sql/SqlDdlGeneratorTest.java
 
b/migration-tools/modules/migration-tools-commons/src/test/java/org/apache/ignite/migrationtools/sql/sql/SqlDdlGeneratorTest.java
index 435f2b7e54e..438bc3f4370 100644
--- 
a/migration-tools/modules/migration-tools-commons/src/test/java/org/apache/ignite/migrationtools/sql/sql/SqlDdlGeneratorTest.java
+++ 
b/migration-tools/modules/migration-tools-commons/src/test/java/org/apache/ignite/migrationtools/sql/sql/SqlDdlGeneratorTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.migrationtools.sql.sql;
 
+import static java.util.Collections.emptyMap;
 import static java.util.Map.entry;
 import static 
org.apache.ignite.migrationtools.sql.SqlDdlGenerator.EXTRA_FIELDS_COLUMN_NAME;
 import static 
org.apache.ignite.migrationtools.sql.sql.SqlDdlGeneratorTest.ColumnRecord.nonKey;
@@ -50,19 +51,24 @@ import java.util.stream.Collector;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.commons.collections4.IterableUtils;
+import org.apache.commons.lang3.ClassUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.examples.model.Organization;
 import org.apache.ignite.examples.model.Person;
 import org.apache.ignite.migrationtools.sql.SqlDdlGenerator;
+import 
org.apache.ignite.migrationtools.sql.SqlDdlGenerator.GenerateTableResult;
+import org.apache.ignite.migrationtools.tablemanagement.TableTypeDescriptor;
 import 
org.apache.ignite.migrationtools.tablemanagement.TableTypeRegistryMapImpl;
 import org.apache.ignite.migrationtools.tests.models.ComplexKeyIntStr;
+import org.apache.ignite.migrationtools.tests.models.InterceptingFieldsModel;
 import org.apache.ignite.migrationtools.tests.models.SimplePojo;
 import org.apache.ignite3.catalog.ColumnSorted;
 import org.apache.ignite3.catalog.definitions.ColumnDefinition;
 import org.apache.ignite3.catalog.definitions.TableDefinition;
 import org.assertj.core.api.SoftAssertions;
+import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.Named;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -71,8 +77,6 @@ import org.junit.jupiter.params.provider.FieldSource;
 import org.junit.jupiter.params.provider.MethodSource;
 
 class SqlDdlGeneratorTest {
-    // TODO: Need to create tests to check if the module is populating the 
typehints correctly.
-
     private static final List<Named<Boolean>> EXTRA_FIELDS_ENABLED_ARG = 
List.of(
             named("No extra fields support", false),
             named("With extra fields support", true)
@@ -208,8 +212,20 @@ class SqlDdlGeneratorTest {
         };
     }
 
-    private static void testCacheConfig(CacheConfiguration<?, ?> cacheCfg, 
boolean allowExtraFields, List<ColumnRecord> asserts) {
-        var tableDef = generateTableDef(cacheCfg, allowExtraFields);
+    private static void testCacheConfig(
+            CacheConfiguration<?, ?> cacheCfg,
+            boolean allowExtraFields,
+            List<ColumnRecord> asserts,
+            Map.Entry<String, String> typeHints,
+            @Nullable Map<String, String> expectedKeyColumMappings,
+            @Nullable Map<String, String> expectedValColumMappings
+    ) {
+        SqlDdlGenerator gen = new SqlDdlGenerator(new 
TableTypeRegistryMapImpl(), allowExtraFields);
+        GenerateTableResult res = gen.generate(cacheCfg);
+
+        assertThat(res.typeHints()).isEqualTo(typeHints);
+
+        TableDefinition tableDef = res.tableDefinition();
         Stream<ColumnRecord> allowFieldsCol = (allowExtraFields)
                 ? Stream.of(new ColumnRecord(EXTRA_FIELDS_COLUMN_NAME, 
"VARBINARY", true, false))
                 : Stream.empty();
@@ -236,6 +252,18 @@ class SqlDdlGeneratorTest {
                 .containsExactlyInAnyOrderElementsOf(expectedColumns);
 
         sa.assertAll();
+
+        TableTypeDescriptor tableTypeDescriptor = res.tableTypeDescriptor();
+
+        // Check key mappings;
+        assertThat(tableTypeDescriptor.keyFieldNameForColumn())
+                .describedAs("Key column mappings")
+                .isEqualTo(expectedKeyColumMappings);
+
+        // Check val mappings;
+        assertThat(tableTypeDescriptor.valFieldNameForColumn())
+                .describedAs("Val column mappings")
+                .isEqualTo(expectedValColumMappings);
     }
 
     private static <K, V> Collector<Map.Entry<K, V>, ?, LinkedHashMap<K, V>> 
linkedMapCollector() {
@@ -256,16 +284,29 @@ class SqlDdlGeneratorTest {
             Class valType,
             String valDef
     ) {
+        String keyTypeName = ClassUtils.primitiveToWrapper(keyType).getName();
+        String valTypeName = ClassUtils.primitiveToWrapper(valType).getName();
+
         var cacheCfg = configWithIndexType(keyType, valType);
-        testCacheConfig(cacheCfg, false, List.of(
-                primaryKey("ID", keyDef),
-                nonKey("VAL", valDef, false)));
+        testCacheConfig(
+                cacheCfg,
+                false,
+                List.of(
+                    primaryKey("ID", keyDef),
+                    nonKey("VAL", valDef, false)
+                ),
+                entry(keyTypeName, valTypeName),
+                emptyMap(),
+                emptyMap()
+        );
     }
 
     @ParameterizedTest
     @MethodSource("provideCacheConfigSupplier")
-    void testTableDefWithComplexKeyAndSimplePojo(BiFunction<Class<?>, 
Class<?>, CacheConfiguration<?, ?>> cacheConfigSupplier,
-            boolean allowExtraFields) {
+    void testTableDefWithComplexKeyAndSimplePojo(
+            BiFunction<Class<?>, Class<?>, CacheConfiguration<?, ?>> 
cacheConfigSupplier,
+            boolean allowExtraFields
+    ) {
         var cacheCfg = cacheConfigSupplier.apply(ComplexKeyIntStr.class, 
SimplePojo.class);
         {
             QueryEntity qe = 
cacheCfg.getQueryEntities().stream().findFirst().orElseThrow();
@@ -275,25 +316,56 @@ class SqlDdlGeneratorTest {
             aliases.put("affinityStr", "AFFINITY_STR");
         }
 
-        testCacheConfig(cacheCfg, allowExtraFields, List.of(
-                primaryKey("id", "INT"),
-                primaryKey("AFFINITY_STR", "VARCHAR"),
-                nonKey("name", "VARCHAR", true),
-                nonKey("amount", "INT", false),
-                nonKey("decimalAmount", "DECIMAL", true)));
+        // TODO: This is wrong, we are not doing the aliasses.
+        testCacheConfig(
+                cacheCfg,
+                allowExtraFields,
+                List.of(
+                    primaryKey("id", "INT"),
+                    primaryKey("AFFINITY_STR", "VARCHAR"),
+                    nonKey("name", "VARCHAR", true),
+                    nonKey("amount", "INT", false),
+                    nonKey("decimalAmount", "DECIMAL", true)
+                ),
+                entry(ComplexKeyIntStr.class.getName(), 
SimplePojo.class.getName()),
+                Map.ofEntries(
+                        entry("id", "id"),
+                        entry("AFFINITY_STR", "affinityStr")
+                ),
+                Map.ofEntries(
+                        entry("name", "name"),
+                        entry("amount", "amount"),
+                        entry("decimalAmount", "decimalAmount")
+                )
+        );
     }
 
     @ParameterizedTest
     @MethodSource("provideCacheConfigSupplier")
-    void testTableDefWithOrganizationPojo(BiFunction<Class<?>, Class<?>, 
CacheConfiguration<?, ?>> cacheConfigSupplier,
-            boolean allowExtraFields) {
+    void testTableDefWithOrganizationPojo(
+            BiFunction<Class<?>, Class<?>, CacheConfiguration<?, ?>> 
cacheConfigSupplier,
+            boolean allowExtraFields
+    ) {
         var cacheCfg = cacheConfigSupplier.apply(int.class, 
Organization.class);
-        testCacheConfig(cacheCfg, allowExtraFields, List.of(
-                primaryKey("KEY", "INT"),
-                nonKey("id", "BIGINT", true),
-                nonKey("name", "VARCHAR", true),
-                nonKey("type", "VARCHAR", true),
-                nonKey("lastUpdated", "TIMESTAMP", true)));
+        testCacheConfig(
+                cacheCfg,
+                allowExtraFields,
+                List.of(
+                    primaryKey("KEY", "INT"),
+                    nonKey("id", "BIGINT", true),
+                    nonKey("name", "VARCHAR", true),
+                    nonKey("type", "VARCHAR", true),
+                    nonKey("lastUpdated", "TIMESTAMP", true)
+                ),
+                entry(Integer.class.getName(), Organization.class.getName()),
+                emptyMap(),
+                Map.ofEntries(
+                        entry("id", "id"),
+                        entry("name", "name"),
+                        entry("type", "type"),
+                        entry("lastUpdated", "lastUpdated")
+                )
+        );
     }
 
     @ParameterizedTest
@@ -304,7 +376,23 @@ class SqlDdlGeneratorTest {
     ) {
         // TODO: Make dynamic pojos with BB to cover all the possible 
scenarios...
         var cacheCfg = cacheConfigSupplier.apply(int.class, Person.class);
-        testCacheConfig(cacheCfg, allowExtraFields, PERSON_EXPECTED_FIELDS);
+
+        Map<String, String> expectedValFieldToColumnMappings = Map.ofEntries(
+                entry("id", "id"),
+                entry("orgId", "orgId"),
+                entry("firstName", "firstName"),
+                entry("lastName", "lastName"),
+                entry("resume", "resume"),
+                entry("salary", "salary")
+        );
+        testCacheConfig(
+                cacheCfg,
+                allowExtraFields,
+                PERSON_EXPECTED_FIELDS,
+                entry(Integer.class.getName(), Person.class.getName()),
+                emptyMap(),
+                expectedValFieldToColumnMappings
+        );
     }
 
     @ParameterizedTest
@@ -313,15 +401,25 @@ class SqlDdlGeneratorTest {
             BiFunction<Class<?>, Class<?>, CacheConfiguration<?, ?>> 
cacheConfigSupplier,
             boolean allowExtraFields
     ) {
+        String valueTypeName = Person.class.getName().replace("Person", 
"FakePerson");
         var cacheCfg = cacheConfigSupplier.apply(int.class, Person.class);
         QueryEntity qe = 
cacheCfg.getQueryEntities().stream().findFirst().orElseThrow();
-        qe.setValueType(Person.class.getName().replace("Person", 
"FakePerson"));
+        qe.setValueType(valueTypeName);
 
         Set<String> notNullFields = new HashSet<>();
         notNullFields.add("salary");
         qe.setNotNullFields(notNullFields);
 
-        testCacheConfig(cacheCfg, allowExtraFields, PERSON_EXPECTED_FIELDS);
+        // Since the class is not in the classpath we expect empty mappings.
+        Map<String, String> expectedFieldToColumnMappings = emptyMap();
+        testCacheConfig(
+                cacheCfg,
+                allowExtraFields,
+                PERSON_EXPECTED_FIELDS,
+                entry(Integer.class.getName(), valueTypeName),
+                expectedFieldToColumnMappings,
+                null
+        );
     }
 
     @ParameterizedTest
@@ -343,7 +441,56 @@ class SqlDdlGeneratorTest {
                 .map(skipNth(1, c -> new 
ColumnRecord(StringUtils.swapCase(c.name), c.type, c.nullable, c.isPk)))
                 .collect(Collectors.toList());
 
-        testCacheConfig(cacheCfg, allowExtraFields, renameExpectedColumns);
+        var expectedValFieldToColumnMappings = PERSON_EXPECTED_FIELDS.stream()
+                .skip(1)
+                .collect(Collectors.toMap(c -> StringUtils.swapCase(c.name), c 
-> c.name));
+
+        testCacheConfig(
+                cacheCfg,
+                allowExtraFields,
+                renameExpectedColumns,
+                entry(Integer.class.getName(), Person.class.getName()),
+                emptyMap(),
+                expectedValFieldToColumnMappings
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideCacheConfigSupplier")
+    void testKeyAndValuePojoWithInterceptingFieldNames(
+            BiFunction<Class<?>, Class<?>, CacheConfiguration<?, ?>> 
cacheConfigSupplier,
+            boolean allowExtraFields
+    ) {
+        var cacheCfg = 
cacheConfigSupplier.apply(InterceptingFieldsModel.Key.class, 
InterceptingFieldsModel.Value.class);
+
+        // This order should not matter much.
+        List<ColumnRecord> expectedFields = List.of(
+                nonKey("key1", "BIGINT", false),
+                nonKey("key2", "BIGINT", false),
+                nonKey("\"VALUE\"", "VARCHAR", true),
+                primaryKey("ID", "INT"),
+                primaryKey("KEY", "INT")
+        );
+
+        Map<String, String> expectedKeyFieldToColumnMappings = Map.ofEntries(
+                entry("ID", "key1"),
+                entry("KEY", "key2")
+        );
+
+        Map<String, String> expectedValFieldToColumnMappings = Map.ofEntries(
+                entry("key1", "key1"),
+                entry("key2", "key2"),
+                entry("value", "value")
+        );
+
+        testCacheConfig(
+                cacheCfg,
+                allowExtraFields,
+                expectedFields,
+                entry(InterceptingFieldsModel.Key.class.getName(), 
InterceptingFieldsModel.Value.class.getName()),
+                expectedKeyFieldToColumnMappings,
+                expectedValFieldToColumnMappings
+        );
     }
 
     @ParameterizedTest
@@ -352,17 +499,33 @@ class SqlDdlGeneratorTest {
         CacheConfiguration<?, ?> cacheCfg = new 
CacheConfiguration<>("some-cache");
         
cacheCfg.setQueryEntities(Collections.singletonList(POJO_WITH_PRIMITIVES_QE));
 
-        testCacheConfig(cacheCfg, allowExtraFields, 
POJO_WITH_PRIMITIVES_FIELDS);
+        testCacheConfig(
+                cacheCfg,
+                allowExtraFields,
+                POJO_WITH_PRIMITIVES_FIELDS,
+                entry(Long.class.getName(), Object.class.getName()),
+                emptyMap(),
+                null
+        );
     }
 
     @ParameterizedTest
     @FieldSource("EXTRA_FIELDS_ENABLED_ARG")
     void testTableDefWithPojoWithPrimitiveFieldsDefinedInTypes(boolean 
allowExtraFields) {
+        String valType = "PersonRecordClassInRoot";
+
         CacheConfiguration<?, ?> cacheCfg = new 
CacheConfiguration<>("some-cache");
         cacheCfg.setQueryEntities(Collections.singletonList(
-                new 
QueryEntity(POJO_WITH_PRIMITIVES_QE).setKeyType(long.class.getName()).setValueType("PersonRecordClassInRoot")));
-
-        testCacheConfig(cacheCfg, allowExtraFields, 
POJO_WITH_PRIMITIVES_FIELDS);
+                new 
QueryEntity(POJO_WITH_PRIMITIVES_QE).setKeyType(long.class.getName()).setValueType(valType)));
+
+        testCacheConfig(
+                cacheCfg,
+                allowExtraFields,
+                POJO_WITH_PRIMITIVES_FIELDS,
+                entry(Long.class.getName(), valType),
+                emptyMap(),
+                null
+        );
     }
 
     @Test
diff --git 
a/migration-tools/modules/migration-tools-commons/src/test/java/org/apache/ignite/migrationtools/types/TypeInspectorTest.java
 
b/migration-tools/modules/migration-tools-commons/src/test/java/org/apache/ignite/migrationtools/types/TypeInspectorTest.java
new file mode 100644
index 00000000000..b2eadc0ec87
--- /dev/null
+++ 
b/migration-tools/modules/migration-tools-commons/src/test/java/org/apache/ignite/migrationtools/types/TypeInspectorTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.migrationtools.types;
+
+import static org.apache.ignite.migrationtools.types.InspectedField.forNamed;
+import static org.apache.ignite.migrationtools.types.TypeInspector.inspectType;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
+
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.stream.Stream;
+import org.apache.ignite.examples.model.Address;
+import org.apache.ignite.examples.model.Organization;
+import org.apache.ignite.examples.model.OrganizationType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+class TypeInspectorTest {
+    @ParameterizedTest
+    @MethodSource("primitiveTypes")
+    void testPrimitiveFieldType(Class<?> primitiveKlass, String typeName) {
+        List<InspectedField> inspectedTypes = inspectType(primitiveKlass);
+
+        var expected = InspectedField.forUnnamed(typeName, 
InspectedFieldType.PRIMITIVE);
+        assertThat(inspectedTypes).containsExactly(expected);
+    }
+
+    private static Stream<Arguments> primitiveTypes() {
+        return Stream.of(
+                arguments(boolean.class, Boolean.class.getName()),
+                arguments(byte.class, Byte.class.getName()),
+                arguments(char.class, Character.class.getName()),
+                arguments(short.class, Short.class.getName()),
+                arguments(int.class, Integer.class.getName()),
+                arguments(long.class, Long.class.getName()),
+                arguments(double.class, Double.class.getName()),
+                arguments(float.class, Float.class.getName()),
+                arguments(String.class, String.class.getName()),
+                arguments(Boolean.class, Boolean.class.getName()),
+                arguments(Byte.class, Byte.class.getName()),
+                arguments(Character.class, Character.class.getName()),
+                arguments(Short.class, Short.class.getName()),
+                arguments(Integer.class, Integer.class.getName()),
+                arguments(Long.class, Long.class.getName()),
+                arguments(Double.class, Double.class.getName()),
+                arguments(Float.class, Float.class.getName())
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource("arrayTypes")
+    void testArrayFieldType(Class<?> primitiveKlass) {
+        List<InspectedField> inspectedTypes = inspectType(primitiveKlass);
+
+        var expected = InspectedField.forUnnamed(primitiveKlass.getName(), 
InspectedFieldType.ARRAY);
+        assertThat(inspectedTypes).containsExactly(expected);
+    }
+
+    private static Stream<Arguments> arrayTypes() {
+        return Stream.of(
+                arguments(boolean[].class),
+                arguments(byte[].class),
+                arguments(char[].class),
+                arguments(short[].class),
+                arguments(int[].class),
+                arguments(long[].class),
+                arguments(double[].class),
+                arguments(float[].class),
+                arguments(String[].class),
+                arguments(Boolean[].class),
+                arguments(Byte[].class),
+                arguments(Character[].class),
+                arguments(Short[].class),
+                arguments(Integer[].class),
+                arguments(Long[].class),
+                arguments(Double[].class),
+                arguments(Float[].class),
+                arguments(List.class)
+        );
+    }
+
+    @Test
+    void testNestedPojoAttribute() {
+        List<InspectedField> inspectedTypes = inspectType(Organization.class);
+
+        InspectedField[] expected = new InspectedField[] {
+                forNamed("id", Long.class.getName(), 
InspectedFieldType.POJO_ATTRIBUTE, true, true),
+                forNamed("name", String.class.getName(), 
InspectedFieldType.POJO_ATTRIBUTE, true, true),
+                forNamed("addr", Address.class.getName(), 
InspectedFieldType.NESTED_POJO_ATTRIBUTE, true, false),
+                forNamed("type", OrganizationType.class.getName(), 
InspectedFieldType.POJO_ATTRIBUTE, true, false),
+                forNamed("lastUpdated", Timestamp.class.getName(), 
InspectedFieldType.POJO_ATTRIBUTE, true, false)
+        };
+
+        assertThat(inspectedTypes).containsExactly(expected);
+    }
+}
diff --git 
a/migration-tools/modules/migration-tools-persistence/src/main/java/org/apache/ignite/migrationtools/persistence/Ignite2PersistentCacheTools.java
 
b/migration-tools/modules/migration-tools-persistence/src/main/java/org/apache/ignite/migrationtools/persistence/Ignite2PersistentCacheTools.java
index 89bbf80fd95..3818786284e 100644
--- 
a/migration-tools/modules/migration-tools-persistence/src/main/java/org/apache/ignite/migrationtools/persistence/Ignite2PersistentCacheTools.java
+++ 
b/migration-tools/modules/migration-tools-persistence/src/main/java/org/apache/ignite/migrationtools/persistence/Ignite2PersistentCacheTools.java
@@ -190,7 +190,7 @@ public class Ignite2PersistentCacheTools {
         String quotedName = IgniteNameUtils.quoteIfNeeded(cacheName);
         @Nullable ClientTable table = (ClientTable) 
client.tables().table(quotedName);
         SqlDdlGenerator.GenerateTableResult tableDefinition = 
sqlGenerator.generate(cacheCfg);
-        Map<String, String> columnToFieldMappings = 
tableDefinition.fieldNameForColumnMappings();
+        Map<String, String> columnToFieldMappings = 
tableDefinition.fieldToColumnMappings();
         if (table == null) {
             table = (ClientTable) client.catalog()
                     .createTableAsync(tableDefinition.tableDefinition())


Reply via email to