Elbehery commented on code in PR #15615:
URL: https://github.com/apache/iceberg/pull/15615#discussion_r3317712762


##########
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestIcebergWriterFactory.java:
##########
@@ -96,4 +102,133 @@ public void testAutoCreateTable(boolean partitioned) {
     assertThat(capturedArguments.get(1)).isEqualTo(Namespace.of("foo1", 
"foo2"));
     assertThat(capturedArguments.get(2)).isEqualTo(Namespace.of("foo1", 
"foo2", "foo3"));
   }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testAutoCreateTableWithIdentifierFields() {
+    Catalog catalog = mock(Catalog.class, 
withSettings().extraInterfaces(SupportsNamespaces.class));
+    when(catalog.loadTable(any())).thenThrow(new NoSuchTableException("no such 
table"));
+
+    TableSinkConfig tableConfig = mock(TableSinkConfig.class);
+    when(tableConfig.partitionBy()).thenReturn(ImmutableList.of());
+    // Configure ID columns
+    when(tableConfig.idColumns()).thenReturn(ImmutableList.of("id", "data"));
+
+    IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+    when(config.autoCreateProps()).thenReturn(ImmutableMap.of("test-prop", 
"foo1"));
+    when(config.tableConfig(any())).thenReturn(tableConfig);
+
+    // Create a Kafka schema with required fields for identifier columns
+    org.apache.kafka.connect.data.Schema valueSchema =
+        SchemaBuilder.struct()
+            .field("id", org.apache.kafka.connect.data.Schema.INT64_SCHEMA) // 
required
+            .field("data", org.apache.kafka.connect.data.Schema.STRING_SCHEMA) 
// required
+            .build();
+
+    Struct value = new Struct(valueSchema).put("id", 123L).put("data", "foo2");
+
+    SinkRecord record = mock(SinkRecord.class);
+    when(record.valueSchema()).thenReturn(valueSchema);
+    when(record.value()).thenReturn(value);
+
+    IcebergWriterFactory factory = new IcebergWriterFactory(catalog, config);
+    factory.autoCreateTable("foo.bar", record);
+
+    ArgumentCaptor<TableIdentifier> identCaptor = 
ArgumentCaptor.forClass(TableIdentifier.class);
+    ArgumentCaptor<Schema> schemaCaptor = 
ArgumentCaptor.forClass(Schema.class);
+    ArgumentCaptor<PartitionSpec> specCaptor = 
ArgumentCaptor.forClass(PartitionSpec.class);
+    ArgumentCaptor<Map<String, String>> propsCaptor = 
ArgumentCaptor.forClass(Map.class);
+
+    verify(catalog)
+        .createTable(
+            identCaptor.capture(),
+            schemaCaptor.capture(),
+            specCaptor.capture(),
+            propsCaptor.capture());
+
+    Schema schema = schemaCaptor.getValue();
+    assertThat(schema.findField("id").type()).isEqualTo(LongType.get());
+    assertThat(schema.findField("data").type()).isEqualTo(StringType.get());
+
+    // Verify identifier field IDs are set correctly
+    Set<Integer> identifierFieldIds = schema.identifierFieldIds();
+    assertThat(identifierFieldIds)
+        .as("Schema should have identifier field IDs set")
+        .containsExactlyInAnyOrder(
+            schema.findField("id").fieldId(), 
schema.findField("data").fieldId());
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testAutoCreateTableWithMissingIdentifierField() {
+    Catalog catalog = mock(Catalog.class, 
withSettings().extraInterfaces(SupportsNamespaces.class));
+    when(catalog.loadTable(any())).thenThrow(new NoSuchTableException("no such 
table"));
+
+    TableSinkConfig tableConfig = mock(TableSinkConfig.class);
+    when(tableConfig.partitionBy()).thenReturn(ImmutableList.of());
+    // Configure ID column that doesn't exist in the data
+    
when(tableConfig.idColumns()).thenReturn(ImmutableList.of("missing_column"));
+
+    IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+    when(config.autoCreateProps()).thenReturn(ImmutableMap.of());
+    when(config.tableConfig(any())).thenReturn(tableConfig);
+
+    // Create schema without the missing column
+    org.apache.kafka.connect.data.Schema valueSchema =
+        SchemaBuilder.struct()
+            .field("id", org.apache.kafka.connect.data.Schema.INT64_SCHEMA)
+            .field("data", org.apache.kafka.connect.data.Schema.STRING_SCHEMA)
+            .build();
+
+    Struct value = new Struct(valueSchema).put("id", 123L).put("data", "foo");
+
+    SinkRecord record = mock(SinkRecord.class);
+    when(record.valueSchema()).thenReturn(valueSchema);
+    when(record.value()).thenReturn(value);
+
+    IcebergWriterFactory factory = new IcebergWriterFactory(catalog, config);
+
+    // Should throw DataException when ID column is not found in schema
+    assertThatThrownBy(() -> factory.autoCreateTable("foo.bar", record))
+        .isInstanceOf(DataException.class)
+        .hasMessageContaining("ID column 'missing_column' not found in schema")
+        .hasMessageContaining("Available columns:");
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testAutoCreateTableWithOptionalIdentifierField() {
+    Catalog catalog = mock(Catalog.class, 
withSettings().extraInterfaces(SupportsNamespaces.class));
+    when(catalog.loadTable(any())).thenThrow(new NoSuchTableException("no such 
table"));
+
+    TableSinkConfig tableConfig = mock(TableSinkConfig.class);
+    when(tableConfig.partitionBy()).thenReturn(ImmutableList.of());
+    // Configure ID column
+    when(tableConfig.idColumns()).thenReturn(ImmutableList.of("id"));
+
+    IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+    when(config.autoCreateProps()).thenReturn(ImmutableMap.of());
+    when(config.tableConfig(any())).thenReturn(tableConfig);
+
+    // Create schema with optional identifier field
+    org.apache.kafka.connect.data.Schema valueSchema =
+        SchemaBuilder.struct()
+            .field("id", 
org.apache.kafka.connect.data.Schema.OPTIONAL_INT64_SCHEMA) // optional!

Review Comment:
   So I will create a follow-up PR with the changes here and in 
https://github.com/apache/iceberg/pull/15615/changes#r3311617558 👍🏽 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to