leonardBang commented on code in PR #3285: URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1687881675
########## flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/data_types_test.sql: ########## @@ -0,0 +1,28 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +DROP TABLE IF EXISTS DATA_TYPES_TABLE; + +CREATE TABLE DATA_TYPES_TABLE ( + ID INT NOT NULL, + TS DATETIME(0), + NUM DECIMAL(10, 2), + PRIMARY KEY (ID) +); Review Comment: The type is not so complex from my point, could you refer `fullTypesTest` of MySQL CDC Source? ########## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java: ########## @@ -56,6 +67,181 @@ public static List<RecordData.FieldGetter> createFieldGetters(List<Column> colum return fieldGetters; } + /** Restore original data fields from RecordData structure. */ + public static List<Object> restoreOriginalData( + @Nullable RecordData recordData, List<RecordData.FieldGetter> fieldGetters) { + if (recordData == null) { + return Collections.emptyList(); + } + List<Object> actualFields = new ArrayList<>(); + for (RecordData.FieldGetter fieldGetter : fieldGetters) { + actualFields.add(fieldGetter.getFieldOrNull(recordData)); + } + return actualFields; + } + + /** Merge compatible upstream schemas. */ + public static Schema mergeCompatibleSchemas(List<Schema> schemas) { + if (schemas.isEmpty()) { + return null; + } else if (schemas.size() == 1) { + return schemas.get(0); + } else { + Schema outputSchema = null; + for (Schema schema : schemas) { + outputSchema = mergeSchema(outputSchema, schema); + } + return outputSchema; + } + } + + /** Try to combine two schemas with potential incompatible type. */ + @VisibleForTesting + public static Schema mergeSchema(@Nullable Schema lSchema, Schema rSchema) { + if (lSchema == null) { + return rSchema; + } + if (lSchema.getColumnCount() != rSchema.getColumnCount()) { + throw new IllegalStateException( + String.format( + "Unable to merge schema %s and %s with different column counts.", + lSchema, rSchema)); + } + if (!lSchema.primaryKeys().equals(rSchema.primaryKeys())) { + throw new IllegalStateException( + String.format( + "Unable to merge schema %s and %s with different primary keys.", + lSchema, rSchema)); + } + if (!lSchema.partitionKeys().equals(rSchema.partitionKeys())) { + throw new IllegalStateException( + String.format( + "Unable to merge schema %s and %s with different partition keys.", + lSchema, rSchema)); + } + if (!lSchema.options().equals(rSchema.options())) { + throw new IllegalStateException( + String.format( + "Unable to merge schema %s and %s with different options.", + lSchema, rSchema)); + } + if (!Objects.equals(lSchema.comment(), rSchema.comment())) { + throw new IllegalStateException( + String.format( + "Unable to merge schema %s and %s with different comments.", + lSchema, rSchema)); + } + + List<Column> leftColumns = lSchema.getColumns(); + List<Column> rightColumns = rSchema.getColumns(); + + List<Column> mergedColumns = + IntStream.range(0, lSchema.getColumnCount()) + .mapToObj(i -> mergeColumn(leftColumns.get(i), rightColumns.get(i))) + .collect(Collectors.toList()); + + return lSchema.copy(mergedColumns); + } + + /** Try to combine two columns with potential incompatible type. */ + @VisibleForTesting + public static Column mergeColumn(Column lColumn, Column rColumn) { + if (!Objects.equals(lColumn.getName(), rColumn.getName())) { + throw new IllegalStateException( + String.format( + "Unable to merge column %s and %s with different name.", + lColumn, rColumn)); + } + if (!Objects.equals(lColumn.getComment(), rColumn.getComment())) { + throw new IllegalStateException( + String.format( + "Unable to merge column %s and %s with different comments.", + lColumn, rColumn)); + } + return lColumn.copy(mergeDataType(lColumn.getType(), rColumn.getType())); + } + + /** Try to combine given data types to a compatible wider data type. */ + @VisibleForTesting + public static DataType mergeDataType(DataType lType, DataType rType) { Review Comment: How about rename to `inferWiderType` , `inferWiderColumn` and `inferWiderSchema` ? ########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java: ########## @@ -30,74 +29,81 @@ import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.schema.Selectors; import org.apache.flink.cdc.common.utils.SchemaUtils; -import org.apache.flink.cdc.common.utils.StringUtils; -import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient; -import org.apache.flink.cdc.runtime.parser.TransformParser; import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.runtime.state.StateInitializationContext; -import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.StreamTask; import javax.annotation.Nullable; import java.time.ZoneId; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -/** A data process function that applies user-defined transform logics. */ -public class TransformDataOperator extends AbstractStreamOperator<Event> +/** + * A data process function that performs column filtering, calculated column evaluation & final + * projection. + */ +public class PostTransformOperator extends AbstractStreamOperator<Event> implements OneInputStreamOperator<Event, Event> { Review Comment: Could you add a `serialVersionUID` for all serializable `StreamOperator` ? ########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformChangeInfo.java: ########## @@ -37,26 +37,27 @@ import java.util.List; /** The TableInfo applies to cache schema change and fieldGetters. */ -public class TableChangeInfo { Review Comment: Could you update the description? ########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformChangeInfo.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.transform; + +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.utils.SchemaUtils; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter; + +import java.util.List; + +/** The TableInfo applies to cache schema and fieldGetters. */ +public class PostTransformChangeInfo { Review Comment: do it too ########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformers.java: ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.transform; + +import org.apache.flink.cdc.common.schema.Selectors; + +import javax.annotation.Nullable; + +import java.util.Optional; + +/** Transformation rules used by {@link PostTransformOperator}. */ +public class PostTransformers { Review Comment: A `TransformRule` should map a `PostTransformer` instead of a `PostTransformers` , right? could you also update the java doc ? ########## flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/ComplexDataTypesE2eITCase.java: ########## @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.pipeline.tests; + +import org.apache.flink.cdc.common.test.utils.TestUtils; +import org.apache.flink.cdc.connectors.doris.sink.utils.DorisContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; +import org.testcontainers.lifecycle.Startables; + +import java.nio.file.Path; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** End-to-end tests for complex data types. */ +public class ComplexDataTypesE2eITCase extends PipelineTestEnvironment { + private static final Logger LOG = LoggerFactory.getLogger(ComplexDataTypesE2eITCase.class); + + // ------------------------------------------------------------------------------------------ + // MySQL Variables (we always use MySQL as the data source for easier verifying) + // ------------------------------------------------------------------------------------------ + protected static final String MYSQL_TEST_USER = "mysqluser"; + protected static final String MYSQL_TEST_PASSWORD = "mysqlpw"; + protected static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; + public static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240; + public static final int TESTCASE_TIMEOUT_SECONDS = 60; + + @ClassRule + public static final MySqlContainer MYSQL = + (MySqlContainer) + new MySqlContainer( + MySqlVersion.V8_0) // v8 support both ARM and AMD architectures + .withConfigurationOverride("docker/mysql/my.cnf") + .withSetupSQL("docker/mysql/setup.sql") + .withDatabaseName("flink-test") + .withUsername("flinkuser") + .withPassword("flinkpw") + .withNetwork(NETWORK) + .withNetworkAliases("mysql") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + @ClassRule + public static final DorisContainer DORIS = + new DorisContainer(NETWORK) + .withNetworkAliases("doris") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + protected final UniqueDatabase complexDataTypesDatabase = + new UniqueDatabase(MYSQL, "data_types_test", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + + @BeforeClass + public static void initializeContainers() { + LOG.info("Starting containers..."); + Startables.deepStart(Stream.of(MYSQL)).join(); + Startables.deepStart(Stream.of(DORIS)).join(); + LOG.info("Waiting for backends to be available"); + long startWaitingTimestamp = System.currentTimeMillis(); + + new LogMessageWaitStrategy() + .withRegEx(".*get heartbeat from FE.*") + .withTimes(1) + .withStartupTimeout( + Duration.of(DEFAULT_STARTUP_TIMEOUT_SECONDS, ChronoUnit.SECONDS)) + .waitUntilReady(DORIS); + + while (!checkBackendAvailability()) { + try { + if (System.currentTimeMillis() - startWaitingTimestamp + > DEFAULT_STARTUP_TIMEOUT_SECONDS * 1000) { + throw new RuntimeException("Doris backend startup timed out."); + } + LOG.info("Waiting for backends to be available"); + Thread.sleep(1000); + } catch (InterruptedException ignored) { + // ignore and check next round + } + } + LOG.info("Containers are started."); + } + + @Before + public void before() throws Exception { + super.before(); + complexDataTypesDatabase.createAndInitialize(); + createDorisDatabase(complexDataTypesDatabase.getDatabaseName()); + } + + private static boolean checkBackendAvailability() { + try { + Container.ExecResult rs = + DORIS.execInContainer( + "mysql", + "--protocol=TCP", + "-uroot", + "-P9030", + "-h127.0.0.1", + "-e SHOW BACKENDS\\G"); + + if (rs.getExitCode() != 0) { + return false; + } + String output = rs.getStdout(); + LOG.info("Doris backend status:\n{}", output); + return output.contains("*************************** 1. row ***************************") + && !output.contains("AvailCapacity: 1.000 B"); + } catch (Exception e) { + LOG.info("Failed to check backend status.", e); + return false; + } + } + + @After + public void after() { + super.after(); + complexDataTypesDatabase.dropDatabase(); + dropDorisDatabase(complexDataTypesDatabase.getDatabaseName()); + } + + @Test + public void testSyncWholeDatabase() throws Exception { + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: mysql\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.\\.*\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: doris\n" + + " fenodes: doris:8030\n" + + " benodes: doris:8040\n" + + " username: %s\n" + + " password: \"%s\"\n" + + " table.create.properties.replication_num: 1\n" + + "\n" + + "transform:\n" + + " - source-table: %s.DATA_TYPES_TABLE\n" + + " projection: \\*, 'fine' AS FINE\n" Review Comment: Could you add a filter to cover more logic as e2e case is more expensive than others? ########## flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java: ########## @@ -0,0 +1,919 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.transform; + +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.types.RowType; +import org.apache.flink.cdc.common.utils.SchemaUtils; +import org.apache.flink.cdc.runtime.testutils.operators.EventOperatorTestHarness; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +/** Unit tests for the {@link PreTransformOperator} and {@link PostTransformOperator}. */ +public class UnifiedTransformOperatorTest { + + /** Defines a unified transform test cases. */ + static class UnifiedTransformTestCase { + + private static final Logger LOG = LoggerFactory.getLogger(UnifiedTransformTestCase.class); + + private final TableId tableId; + private final String projectionExpression; + private final String filterExpression; + + private Schema sourceSchema; + private Schema preTransformedSchema; + private Schema postTransformedSchema; + + private final List<Event> sourceEvents; + private final List<Event> preTransformedEvents; + private final List<Event> postTransformedEvents; + + private final List<RecordData.FieldGetter> sourceFieldGetters; + private final List<RecordData.FieldGetter> preTransformedFieldGetters; + private final List<RecordData.FieldGetter> postTransformedFieldGetters; + + private PreTransformOperator preTransformOperator; + private PostTransformOperator postTransformOperator; + + private final BinaryRecordDataGenerator sourceRecordGenerator; + private final BinaryRecordDataGenerator preTransformedRecordGenerator; + private final BinaryRecordDataGenerator postTransformedRecordGenerator; + + private EventOperatorTestHarness<PreTransformOperator, Event> preTransformOperatorHarness; + private EventOperatorTestHarness<PostTransformOperator, Event> postTransformOperatorHarness; + + public static UnifiedTransformTestCase of( + TableId tableId, + String projectionExpression, + String filterExpression, + Schema sourceSchema, + Schema preTransformedSchema, + Schema postTransformedSchema) { + return new UnifiedTransformTestCase( + tableId, + projectionExpression, + filterExpression, + sourceSchema, + preTransformedSchema, + postTransformedSchema); + } + + private Object[] stringify(Object... objects) { + return Arrays.stream(objects) + .map(o -> o instanceof String ? new BinaryStringData((String) o) : o) + .toArray(); + } + + public UnifiedTransformTestCase insertSource(Object... record) { + sourceEvents.add( + DataChangeEvent.insertEvent( + tableId, sourceRecordGenerator.generate(stringify(record)))); + return this; + } + + public UnifiedTransformTestCase insertPreTransformed() { + preTransformedEvents.add(null); + return this; + } + + public UnifiedTransformTestCase insertPreTransformed(Object... record) { + preTransformedEvents.add( + DataChangeEvent.insertEvent( + tableId, preTransformedRecordGenerator.generate(stringify(record)))); + return this; + } + + public UnifiedTransformTestCase insertPostTransformed() { + postTransformedEvents.add(null); + return this; + } + + public UnifiedTransformTestCase insertPostTransformed(Object... record) { + postTransformedEvents.add( + DataChangeEvent.insertEvent( + tableId, postTransformedRecordGenerator.generate(stringify(record)))); + return this; + } + + public UnifiedTransformTestCase updateSource(Object[] beforeRecord, Object[] afterRecord) { + sourceEvents.add( + DataChangeEvent.updateEvent( + tableId, + sourceRecordGenerator.generate(stringify(beforeRecord)), + sourceRecordGenerator.generate(stringify(afterRecord)))); + return this; + } + + public UnifiedTransformTestCase updatePreTransformed() { + preTransformedEvents.add(null); + return this; + } + + public UnifiedTransformTestCase updatePreTransformed( + Object[] beforeRecord, Object[] afterRecord) { + preTransformedEvents.add( + DataChangeEvent.updateEvent( + tableId, + preTransformedRecordGenerator.generate(stringify(beforeRecord)), + preTransformedRecordGenerator.generate(stringify(afterRecord)))); + return this; + } + + public UnifiedTransformTestCase updatePostTransformed() { + postTransformedEvents.add(null); + return this; + } + + public UnifiedTransformTestCase updatePostTransformed( + Object[] beforeRecord, Object[] afterRecord) { + postTransformedEvents.add( + DataChangeEvent.updateEvent( + tableId, + postTransformedRecordGenerator.generate(stringify(beforeRecord)), + postTransformedRecordGenerator.generate(stringify(afterRecord)))); + return this; + } + + public UnifiedTransformTestCase deleteSource(Object... record) { + sourceEvents.add( + DataChangeEvent.deleteEvent( + tableId, sourceRecordGenerator.generate(stringify(record)))); + return this; + } + + public UnifiedTransformTestCase deletePreTransformed() { + preTransformedEvents.add(null); + return this; + } + + public UnifiedTransformTestCase deletePreTransformed(Object... record) { + preTransformedEvents.add( + DataChangeEvent.deleteEvent( + tableId, preTransformedRecordGenerator.generate(stringify(record)))); + return this; + } + + public UnifiedTransformTestCase deletePostTransformed() { + postTransformedEvents.add(null); + return this; + } + + public UnifiedTransformTestCase deletePostTransformed(Object... record) { + postTransformedEvents.add( + DataChangeEvent.deleteEvent( + tableId, postTransformedRecordGenerator.generate(stringify(record)))); + return this; + } + + private UnifiedTransformTestCase( + TableId tableId, + String projectionExpression, + String filterExpression, + Schema sourceSchema, + Schema preTransformedSchema, + Schema postTransformedSchema) { + this.tableId = tableId; + this.projectionExpression = projectionExpression; + this.filterExpression = filterExpression; + + this.sourceSchema = sourceSchema; + this.preTransformedSchema = preTransformedSchema; + this.postTransformedSchema = postTransformedSchema; + + this.sourceRecordGenerator = + new BinaryRecordDataGenerator(((RowType) sourceSchema.toRowDataType())); + this.preTransformedRecordGenerator = + new BinaryRecordDataGenerator(((RowType) preTransformedSchema.toRowDataType())); + this.postTransformedRecordGenerator = + new BinaryRecordDataGenerator( + ((RowType) postTransformedSchema.toRowDataType())); + + this.sourceEvents = new ArrayList<>(); + this.preTransformedEvents = new ArrayList<>(); + this.postTransformedEvents = new ArrayList<>(); + + this.sourceEvents.add(new CreateTableEvent(tableId, sourceSchema)); + this.preTransformedEvents.add(new CreateTableEvent(tableId, preTransformedSchema)); + this.postTransformedEvents.add(new CreateTableEvent(tableId, postTransformedSchema)); + + this.sourceFieldGetters = SchemaUtils.createFieldGetters(sourceSchema); + this.preTransformedFieldGetters = SchemaUtils.createFieldGetters(preTransformedSchema); + this.postTransformedFieldGetters = + SchemaUtils.createFieldGetters(postTransformedSchema); + } + + private UnifiedTransformTestCase initializeHarness() throws Exception { + preTransformOperator = + PreTransformOperator.newBuilder() + .addTransform( + tableId.identifier(), projectionExpression, filterExpression) + .build(); + preTransformOperatorHarness = new EventOperatorTestHarness<>(preTransformOperator, 1); + preTransformOperatorHarness.open(); + + postTransformOperator = + PostTransformOperator.newBuilder() + .addTransform( + tableId.identifier(), projectionExpression, filterExpression) + .build(); + postTransformOperatorHarness = new EventOperatorTestHarness<>(postTransformOperator, 1); + postTransformOperatorHarness.open(); + return this; + } + + private void destroyHarness() throws Exception { + if (preTransformOperatorHarness != null) { + preTransformOperatorHarness.close(); + } + if (postTransformOperatorHarness != null) { + postTransformOperatorHarness.close(); + } + } + + private void logBinaryDataContents( + String prefix, Event event, List<RecordData.FieldGetter> fieldGetters) { + LOG.info("{}: {}", prefix, event); + if (event instanceof DataChangeEvent) { + LOG.info( + " Before Record Data: {}", + SchemaUtils.restoreOriginalData( + ((DataChangeEvent) event).before(), fieldGetters)); + LOG.info( + " After Record Data: {}", + SchemaUtils.restoreOriginalData( + ((DataChangeEvent) event).after(), fieldGetters)); + } + } + + public UnifiedTransformTestCase runTests() throws Exception { + for (int i = 0; i < sourceEvents.size(); i++) { + Event sourceEvent = sourceEvents.get(i); + logBinaryDataContents("Source Event", sourceEvent, sourceFieldGetters); + + preTransformOperator.processElement(new StreamRecord<>(sourceEvent)); + + Event expectedPreTransformEvent = preTransformedEvents.get(i); + Event actualPreTransformEvent = + Optional.ofNullable(preTransformOperatorHarness.getOutputRecords().poll()) + .map(StreamRecord::getValue) + .orElse(null); + + logBinaryDataContents( + "Expected PreTransform ", + expectedPreTransformEvent, + preTransformedFieldGetters); + logBinaryDataContents( + " Actual PreTransform ", + actualPreTransformEvent, + preTransformedFieldGetters); + Assertions.assertThat(actualPreTransformEvent).isEqualTo(expectedPreTransformEvent); + + postTransformOperator.processElement( + new StreamRecord<>(preTransformedEvents.get(i))); + Event expectedPostTransformEvent = postTransformedEvents.get(i); + Event actualPostTransformEvent = + Optional.ofNullable(postTransformOperatorHarness.getOutputRecords().poll()) + .map(StreamRecord::getValue) + .orElse(null); + logBinaryDataContents( + "Expected PostTransform", + expectedPostTransformEvent, + postTransformedFieldGetters); + logBinaryDataContents( + " Actual PostTransform", + actualPostTransformEvent, + postTransformedFieldGetters); + Assertions.assertThat(actualPostTransformEvent) + .isEqualTo(expectedPostTransformEvent); + } + + sourceEvents.clear(); + preTransformedEvents.clear(); + postTransformedEvents.clear(); + return this; + } + } + + @Test + public void testDataChangeEventTransform() throws Exception { + TableId tableId = TableId.tableId("my_company", "my_branch", "data_changes"); + UnifiedTransformTestCase.of( + tableId, + "id, age, id + age as computed", + "id > 100", + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .primaryKey("id") + .build(), + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("age", DataTypes.INT()) + .primaryKey("id") + .build(), + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("age", DataTypes.INT()) + .physicalColumn("computed", DataTypes.INT()) + .primaryKey("id") + .build()) + .initializeHarness() + .insertSource(1000, "Alice", 17) + .insertPreTransformed(1000, 17) + .insertPostTransformed(1000, 17, 1017) + .insertSource(2000, "Bob", 18) + .insertPreTransformed(2000, 18) + .insertPostTransformed(2000, 18, 2018) + .updateSource(new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16}) + .updatePreTransformed(new Object[] {2000, 18}, new Object[] {2000, 16}) + .updatePostTransformed(new Object[] {2000, 18, 2018}, new Object[] {2000, 16, 2016}) + // filtered out data row + .insertSource(50, "Carol", 19) + .insertPreTransformed(50, 19) + .insertPostTransformed() + .deleteSource(1000, "Alice", 17) + .deletePreTransformed(1000, 17) + .deletePostTransformed(1000, 17, 1017) + .runTests() + .destroyHarness(); + } + + @Test + public void testSchemaNullabilityTransform() throws Exception { + TableId tableId = TableId.tableId("my_company", "my_branch", "schema_nullability"); + UnifiedTransformTestCase.of( + tableId, + "id, name, age, id + age as computed", + "id > 100", + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING().notNull()) + .physicalColumn("age", DataTypes.INT().notNull()) + .primaryKey("id") + .build(), + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING().notNull()) + .physicalColumn("age", DataTypes.INT().notNull()) + .primaryKey("id") + .build(), + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING().notNull()) + .physicalColumn("age", DataTypes.INT().notNull()) + .physicalColumn("computed", DataTypes.INT()) + .primaryKey("id") + .build()) + .initializeHarness() + .insertSource(1000, "Alice", 17) + .insertPreTransformed(1000, "Alice", 17) + .insertPostTransformed(1000, "Alice", 17, 1017) + .insertSource(2000, "Bob", 18) + .insertPreTransformed(2000, "Bob", 18) + .insertPostTransformed(2000, "Bob", 18, 2018) + .updateSource(new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16}) + .updatePreTransformed( + new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16}) + .updatePostTransformed( + new Object[] {2000, "Bob", 18, 2018}, + new Object[] {2000, "Barcarolle", 16, 2016}) + // filtered out data row + .insertSource(50, "Carol", 19) + .insertPreTransformed(50, "Carol", 19) + .insertPostTransformed() + .deleteSource(1000, "Alice", 17) + .deletePreTransformed(1000, "Alice", 17) + .deletePostTransformed(1000, "Alice", 17, 1017) + .runTests() + .destroyHarness(); + } + + @Test + public void testReduceColumnsTransform() throws Exception { + TableId tableId = TableId.tableId("my_company", "my_branch", "reduce_column"); + UnifiedTransformTestCase.of( + tableId, + "id, upper(id) as uid, age + 1 as newage, lower(ref1) as lowerref", + "newage > 17 and ref2 > 17", + Schema.newBuilder() + .physicalColumn("id", DataTypes.STRING().notNull()) + .physicalColumn("name", DataTypes.STRING().notNull()) + .physicalColumn("age", DataTypes.INT().notNull()) + .physicalColumn("ref1", DataTypes.STRING()) + .physicalColumn("ref2", DataTypes.INT()) + .primaryKey("id") + .build(), + Schema.newBuilder() + .physicalColumn("id", DataTypes.STRING().notNull()) + .physicalColumn("age", DataTypes.INT().notNull()) + .physicalColumn("ref1", DataTypes.STRING()) + .physicalColumn("ref2", DataTypes.INT()) + .primaryKey("id") + .build(), + Schema.newBuilder() + .physicalColumn("id", DataTypes.STRING().notNull()) + .physicalColumn("uid", DataTypes.STRING()) + .physicalColumn("newage", DataTypes.INT()) + .physicalColumn("lowerref", DataTypes.STRING()) + .primaryKey("id") + .build()) + .initializeHarness() + .insertSource("id001", "Alice", 17, "Reference001", 2021) + .insertPreTransformed("id001", 17, "Reference001", 2021) + .insertPostTransformed("id001", "ID001", 18, "reference001") + // this data record is filtered out since newage <= 17 + .insertSource("id002", "Bob", 15, "Reference002", 2017) + .insertPreTransformed("id002", 15, "Reference002", 2017) + .insertPostTransformed() + // this data record is filtered out since ref2 <= 17 + .insertSource("id003", "Bill", 18, "Reference003", 0) + .insertPreTransformed("id003", 18, "Reference003", 0) + .insertPostTransformed() + .insertSource("id004", "Carol", 18, "Reference004", 2018) + .insertPreTransformed("id004", 18, "Reference004", 2018) + .insertPostTransformed("id004", "ID004", 19, "reference004") + // test update event transform + .updateSource( + new Object[] {"id004", "Carol", 18, "Reference004", 2018}, + new Object[] {"id004", "Colin", 18, "NeoReference004", 2018}) + .updatePreTransformed( + new Object[] {"id004", 18, "Reference004", 2018}, + new Object[] {"id004", 18, "NeoReference004", 2018}) + .updatePostTransformed( + new Object[] {"id004", "ID004", 19, "reference004"}, + new Object[] {"id004", "ID004", 19, "neoreference004"}) + // updated value to a filtered out condition + .updateSource( + new Object[] {"id004", "Colin", 18, "NeoReference004", 2018}, + new Object[] {"id004", "Colin", 10, "NeoReference004", 2018}) + .updatePreTransformed( + new Object[] {"id004", 18, "NeoReference004", 2018}, + new Object[] {"id004", 10, "NeoReference004", 2018}) + .updatePostTransformed() + .deleteSource("id001", "Alice", 17, "Reference001", 2021) + .deletePreTransformed("id001", 17, "Reference001", 2021) + .deletePostTransformed("id001", "ID001", 18, "reference001") + .runTests() + .destroyHarness(); + } + + @Test + public void testWildcardTransform() throws Exception { + TableId tableId = TableId.tableId("my_company", "my_branch", "wildcard"); + UnifiedTransformTestCase.of( + tableId, + "*, id + age as computed", + "id > 100", + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .primaryKey("id") + .build(), + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .primaryKey("id") + .build(), + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .physicalColumn("computed", DataTypes.INT()) + .primaryKey("id") + .build()) + .initializeHarness() + .insertSource(1000, "Alice", 17) + .insertPreTransformed(1000, "Alice", 17) + .insertPostTransformed(1000, "Alice", 17, 1017) + .insertSource(2000, "Bob", 18) + .insertPreTransformed(2000, "Bob", 18) + .insertPostTransformed(2000, "Bob", 18, 2018) + .updateSource(new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16}) + .updatePreTransformed( + new Object[] {2000, "Bob", 18}, new Object[] {2000, "Barcarolle", 16}) + .updatePostTransformed( + new Object[] {2000, "Bob", 18, 2018}, + new Object[] {2000, "Barcarolle", 16, 2016}) + // filtered out data row + .insertSource(50, "Carol", 19) + .insertPreTransformed(50, "Carol", 19) + .insertPostTransformed() + .deleteSource(1000, "Alice", 17) + .deletePreTransformed(1000, "Alice", 17) + .deletePostTransformed(1000, "Alice", 17, 1017) + .runTests() + .destroyHarness(); + + UnifiedTransformTestCase.of( + tableId, + "id + age as computed, *", Review Comment: Could you add test to cover cast case like `id + age as computed, CAST(computed AS BIGINT)as larger_computed`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
