pabloem commented on code in PR #24629:
URL: https://github.com/apache/beam/pull/24629#discussion_r1053589819
##########
sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOPostgresSqlConnectorIT.java:
##########
@@ -51,6 +63,91 @@ public class DebeziumIOPostgresSqlConnectorIT {
.withExposedPorts(5432)
.withDatabaseName("inventory");
+ static final Schema TABLE_SCHEMA =
+ Schema.builder()
+ .addInt32Field("id")
+ .addStringField("first_name")
+ .addStringField("last_name")
+ .addStringField("email")
+ .build();
+
+ static DataSource getPostgresDatasource() {
+ PGSimpleDataSource dataSource = new PGSimpleDataSource();
+ dataSource.setDatabaseName("inventory");
+ dataSource.setServerName(POSTGRES_SQL_CONTAINER.getContainerIpAddress());
+ dataSource.setPortNumber(POSTGRES_SQL_CONTAINER.getMappedPort(5432));
+ dataSource.setUser("debezium");
+ dataSource.setPassword("dbz");
+ return dataSource;
+ }
+
+ @Test
+ public void testDebeziumSchemaTransformPostgresRead() throws
InterruptedException {
+ long writeSize = 500L;
+ long testTime = writeSize * 200L;
+ POSTGRES_SQL_CONTAINER.start();
+
+ PipelineOptions options = PipelineOptionsFactory.create();
+ Pipeline writePipeline = Pipeline.create(options);
+ writePipeline
+ .apply(GenerateSequence.from(0).to(writeSize).withRate(10,
Duration.standardSeconds(1)))
+ .apply(
+ MapElements.into(TypeDescriptors.rows())
+ .via(
+ num ->
+ Row.withSchema(TABLE_SCHEMA)
+ .withFieldValue(
+ "id",
+ // We need this tricky conversion because the
original "customers"
+ // table already
+ // contains rows 1001, 1002, 1003, 1004.
+ num <= 1000
+ ? Long.valueOf(num).intValue()
+ : Long.valueOf(num).intValue() + 4)
+ .withFieldValue("first_name", Long.toString(num))
+ .withFieldValue("last_name",
Long.toString(writeSize - num))
+ .withFieldValue("email", Long.toString(num) +
"@beamail.com")
+ // TODO(pabloem): Add other data types
+ .build()))
+ .setRowSchema(TABLE_SCHEMA)
+ .apply(
+ JdbcIO.<Row>write()
+ .withTable("inventory.inventory.customers")
Review Comment:
I don't think so. I think it's database inventory, schema inventory, table
customers : )
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]