ahmedabu98 commented on code in PR #35228:
URL: https://github.com/apache/beam/pull/35228#discussion_r2140260220
##########
sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java:
##########
@@ -851,6 +857,113 @@ public void testStreamToPartitionedDynamicDestinations()
throws IOException {
writeToDynamicDestinations(null, true, true);
}
+ @Test
+ public void testWriteToDynamicNamespaces() throws IOException {
+ // run this test only on catalogs that support namespace management
+ assumeTrue(catalog instanceof SupportsNamespaces);
+
+ long salt = System.currentTimeMillis();
+ String tableIdentifierTemplate =
+ String.format("namespace_{modulo_5}_%s.table_{bool_field}", salt);
+ Map<String, Object> writeConfig = new
HashMap<>(managedIcebergConfig(tableIdentifierTemplate));
+ // override with table template
+ writeConfig.put("table", tableIdentifierTemplate);
+
+ Namespace namespace0 = Namespace.of("namespace_0_" + salt);
+ Namespace namespace1 = Namespace.of("namespace_1_" + salt);
+ Namespace namespace2 = Namespace.of("namespace_2_" + salt);
+ Namespace namespace3 = Namespace.of("namespace_3_" + salt);
+ Namespace namespace4 = Namespace.of("namespace_4_" + salt);
+
+ TableIdentifier tableId0true = TableIdentifier.of(namespace0,
"table_true");
+ TableIdentifier tableId0false = TableIdentifier.of(namespace0,
"table_false");
+ TableIdentifier tableId1true = TableIdentifier.of(namespace1,
"table_true");
+ TableIdentifier tableId1false = TableIdentifier.of(namespace1,
"table_false");
+ TableIdentifier tableId2true = TableIdentifier.of(namespace2,
"table_true");
+ TableIdentifier tableId2false = TableIdentifier.of(namespace2,
"table_false");
+ TableIdentifier tableId3true = TableIdentifier.of(namespace3,
"table_true");
+ TableIdentifier tableId3false = TableIdentifier.of(namespace3,
"table_false");
+ TableIdentifier tableId4true = TableIdentifier.of(namespace4,
"table_true");
+ TableIdentifier tableId4false = TableIdentifier.of(namespace4,
"table_false");
+
+ List<Namespace> namespaces =
+ Arrays.asList(namespace0, namespace1, namespace2, namespace3,
namespace4);
+ SupportsNamespaces sN = (SupportsNamespaces) catalog;
+ // assert namespace don't exist beforehand
+ namespaces.forEach(n -> assertFalse(sN.namespaceExists(n)));
+
+ pipeline
+ .apply(Create.of(inputRows))
+ .setRowSchema(BEAM_SCHEMA)
+ .apply(Managed.write(ICEBERG).withConfig(writeConfig));
+ pipeline.run().waitUntilFinish();
+
+ // assert namespace were created
+ namespaces.forEach(n -> assertTrue(sN.namespaceExists(n)));
+
+ Table table0true = catalog.loadTable(tableId0true);
+ Table table0false = catalog.loadTable(tableId0false);
+ Table table1true = catalog.loadTable(tableId1true);
+ Table table1false = catalog.loadTable(tableId1false);
+ Table table2true = catalog.loadTable(tableId2true);
+ Table table2false = catalog.loadTable(tableId2false);
+ Table table3true = catalog.loadTable(tableId3true);
+ Table table3false = catalog.loadTable(tableId3false);
+ Table table4true = catalog.loadTable(tableId4true);
+ Table table4false = catalog.loadTable(tableId4false);
+
+ for (Table t :
+ Arrays.asList(
+ table0true,
+ table0false,
+ table1true,
+ table1false,
+ table2true,
+ table2false,
+ table3true,
+ table3false,
+ table4true,
+ table4false)) {
+ assertTrue(t.schema().sameSchema(ICEBERG_SCHEMA));
+ }
+
+ // Read back and check records are correct
+ Map<KV<Long, Boolean>, List<Record>> results =
+ ImmutableMap.<KV<Long, Boolean>, List<Record>>builder()
+ .put(KV.of(0L, true), readRecords(table0true))
+ .put(KV.of(0L, false), readRecords(table0false))
+ .put(KV.of(1L, true), readRecords(table1true))
+ .put(KV.of(1L, false), readRecords(table1false))
+ .put(KV.of(2L, true), readRecords(table2true))
+ .put(KV.of(2L, false), readRecords(table2false))
+ .put(KV.of(3L, true), readRecords(table3true))
+ .put(KV.of(3L, false), readRecords(table3false))
+ .put(KV.of(4L, true), readRecords(table4true))
+ .put(KV.of(4L, false), readRecords(table4false))
+ .build();
+
+ for (Map.Entry<KV<Long, Boolean>, List<Record>> entry :
results.entrySet()) {
+ long modulo = entry.getKey().getKey();
+ boolean bool = entry.getKey().getValue();
+ List<Record> records = entry.getValue();
+ Stream<Record> expectedRecords =
+ inputRows.stream()
+ .filter(
+ rec ->
+ checkStateNotNull(rec.getInt64("modulo_5")) == modulo
+ && checkStateNotNull(rec.getBoolean("bool_field"))
== bool)
+ .map(RECORD_FUNC::apply);
+
+ assertThat(records, containsInAnyOrder(expectedRecords.toArray()));
+ }
+
+ try {
+ namespaces.forEach(sN::dropNamespace);
+ } catch (Exception e) {
+ LOG.error("Test passed but threw an error when cleaning up namespaces.",
e);
Review Comment:
Done, will merge when tests go green
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]