fapaul commented on code in PR #19655: URL: https://github.com/apache/flink/pull/19655#discussion_r867894179
########## flink-connectors/flink-connector-upsert-test/pom.xml: ########## @@ -0,0 +1,151 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flink-connectors</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.16-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>flink-connector-upsert-test</artifactId> + <name>Flink : Connectors : Upsert Test</name> + + <packaging>jar</packaging> + + <dependencies> + + <!-- core dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-base</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-bridge</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + <optional>true</optional> + </dependency> + + <!-- test dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-json</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <!-- ArchUit test dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-architecture-tests-test</artifactId> + <scope>test</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> Review Comment: Why do you need to build a test-jar? ########## flink-connectors/flink-connector-upsert-test/src/main/java/org/apache/flink/connector/upserttest/table/UpsertTestDynamicTableSink.java: ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.upserttest.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.upserttest.sink.UpsertTestSink; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkV2Provider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; + +import java.io.File; +import java.util.Objects; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link DynamicTableSink} that describes how to create a {@link UpsertTestSink} from a logical + * description. + */ +@Internal Review Comment: Nit: Theoretically for package-private classes the annotation is not necessary but it is not a bad idea to still keep it. ########## flink-connectors/flink-connector-upsert-test/src/main/java/org/apache/flink/connector/upserttest/sink/BinaryFileUtil.java: ########## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.upserttest.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Collection uf utility methods for tests using the {@link UpsertTestSink}. */ +@Internal +public class BinaryFileUtil { + private static final byte MAGIC_BYTE = 13; + + /** + * Reads records that were written using the {@link BinaryFileUtil#readRecords} method from the + * given InputStream. + * + * @param bis The BufferedInputStream to read from + * @return Map containing the read ByteBuffer key-value pairs + * @throws IOException + */ + public static Map<ByteBuffer, ByteBuffer> readRecords(BufferedInputStream bis) + throws IOException { + checkNotNull(bis); + Map<ByteBuffer, ByteBuffer> records = new HashMap<>(); Review Comment: I am afraid using `ByteBuffers` as a key does not work. I looked at the code of `ByteBuffer#hashCode` and the implementation is completely independent of the actual content of the buffer. The hash is calculated from the contained metadata information that easily might lead to unwanted collisions. ########## flink-connectors/flink-connector-upsert-test/src/test/java/org/apache/flink/connector/upserttest/sink/UpsertTestSinkWriterITCase.java: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.upserttest.sink; + +import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.TestLoggerExtension; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link UpsertTestSinkWriter}. */ +@ExtendWith(TestLoggerExtension.class) +class UpsertTestSinkWriterITCase { + + @Test + public void testWrite(@TempDir File tempDir) throws Exception { Review Comment: Why did you pass the `@TempDir` as a parameter instead of defining it on the class level as an instance variable? Is there a benefit? ########## flink-connectors/flink-connector-upsert-test/src/main/java/org/apache/flink/connector/upserttest/sink/UpsertTestSinkBuilder.java: ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.upserttest.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializationSchema; + +import java.io.File; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Builder to construct {@link UpsertTestSink}. + * + * <p>The following example shows the minimum setup to create a UpsertTestSink that writes {@code + * Tuple2<String, String>} values to a file. + * + * <pre>{@code + * UpsertTestSink<Tuple2<String, String>> sink = UpsertTestSink + * .<Tuple2<String, String>>builder + * .setOutputFile(MY_OUTPUT_FILE) + * .setKeySerializationSchema(MY_KEY_SERIALIZER) + * .setValueSerializationSchema(MY_VALUE_SERIALIZER) + * .build(); + * }</pre> + * + * @param <IN> type of the records written to the file + */ +@PublicEvolving +public class UpsertTestSinkBuilder<IN> { + + private File outputFile; + private SerializationSchema<IN> keySerializationSchema; + private SerializationSchema<IN> valueSerializationSchema; + + /** + * Sets the output {@link File} to write to. + * + * @param outputFile + * @return {@link UpsertTestSinkBuilder} + */ + public UpsertTestSinkBuilder<IN> setOutputFile(File outputFile) { + this.outputFile = checkNotNull(outputFile); + return this; + } + + /** + * Sets the key {@link SerializationSchema} that transforms incoming records to byte[]. + * + * @param keySerializationSchema + * @return {@link UpsertTestSinkBuilder} + */ + public UpsertTestSinkBuilder<IN> setKeySerializationSchema( + SerializationSchema<IN> keySerializationSchema) { + this.keySerializationSchema = checkNotNull(keySerializationSchema); + return this; + } + + /** + * Sets the value {@link SerializationSchema} that transforms incoming records to byte[]. + * + * @param valueSerializationSchema + * @return {@link UpsertTestSinkBuilder} + */ + public UpsertTestSinkBuilder<IN> setValueSerializationSchema( + SerializationSchema<IN> valueSerializationSchema) { + this.valueSerializationSchema = checkNotNull(valueSerializationSchema); + return this; + } + + /** + * Constructs the {@link UpsertTestSink} with the configured properties. + * + * @return {@link UpsertTestSink} + */ + public UpsertTestSink<IN> build() { + return new UpsertTestSink<>(outputFile, keySerializationSchema, valueSerializationSchema); Review Comment: Nit: Please also validate that the necessary fields of the builder are not null. ########## flink-connectors/flink-connector-upsert-test/src/main/java/org/apache/flink/connector/upserttest/sink/UpsertTestSinkWriter.java: ########## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.upserttest.sink; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.util.FlinkRuntimeException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class is responsible for writing records into a file in an upsert-fashion. On every + * checkpoint each key-value pair currently in the map is written to the file. + * + * @param <IN> The type of the input elements. + */ +class UpsertTestSinkWriter<IN> implements SinkWriter<IN> { + private static final Logger LOG = LoggerFactory.getLogger(UpsertTestSinkWriter.class); + + private final SerializationSchema<IN> keySerializationSchema; + private final SerializationSchema<IN> valueSerializationSchema; + private final Map<ByteBuffer, ByteBuffer> records = new HashMap<>(); + private final BufferedOutputStream bufferedOutputStream; + + UpsertTestSinkWriter( + File outputFile, + SerializationSchema<IN> keySerializationSchema, + SerializationSchema<IN> valueSerializationSchema) { + this.keySerializationSchema = checkNotNull(keySerializationSchema); + this.valueSerializationSchema = checkNotNull(valueSerializationSchema); + checkNotNull(outputFile); + try { + this.bufferedOutputStream = + new BufferedOutputStream(new FileOutputStream(outputFile, true)); + } catch (FileNotFoundException e) { + throw new FlinkRuntimeException("Could not find file", e); + } + } + + @Override + public void write(IN element, Context context) { + byte[] key = keySerializationSchema.serialize(element); + byte[] value = valueSerializationSchema.serialize(element); + records.put(ByteBuffer.wrap(key), ByteBuffer.wrap(value)); + } + + @Override + public void flush(boolean endOfInput) throws IOException { + LOG.debug("Flushing records, endOfInput={}", endOfInput); Review Comment: Nit: Remove log statement? ########## flink-connectors/flink-connector-upsert-test/src/test/java/org/apache/flink/connector/upserttest/sink/UpsertTestSinkWriterITCase.java: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.upserttest.sink; + +import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.TestLoggerExtension; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link UpsertTestSinkWriter}. */ +@ExtendWith(TestLoggerExtension.class) +class UpsertTestSinkWriterITCase { + + @Test + public void testWrite(@TempDir File tempDir) throws Exception { + File outputFile = new File(tempDir, "records.out"); + SerializationSchema<Tuple2<String, String>> keySerializationSchema = + element -> element.f0.getBytes(); + SerializationSchema<Tuple2<String, String>> valueSerializationSchema = + element -> element.f1.getBytes(); + + try (final UpsertTestSinkWriter<Tuple2<String, String>> writer = + new UpsertTestSinkWriter<>( + outputFile, keySerializationSchema, valueSerializationSchema)) { + for (int i = 0; i < 10; i++) { + writer.write(Tuple2.of("Key #" + i, "Value #" + i), null); + } + } + + DeserializationSchema<String> keyDeserializationSchema = new StringDeserializationSchema(); + DeserializationSchema<String> valueDeserializationSchema = + new StringDeserializationSchema(); + + Map<String, String> resultMap = + BinaryFileUtil.readRecords( + outputFile, keyDeserializationSchema, valueDeserializationSchema); + + for (int i = 0; i < 10; i++) { + assertThat(resultMap).containsEntry("Key #" + i, "Value #" + i); + } + } + + @Test + public void testWriteOnCheckpoint(@TempDir File tempDir) throws Exception { + File outputFile = new File(tempDir, "records.out"); + SerializationSchema<Tuple2<String, String>> keySerializationSchema = + element -> element.f0.getBytes(); + SerializationSchema<Tuple2<String, String>> valueSerializationSchema = + element -> element.f1.getBytes(); + + DeserializationSchema<String> keyDeserializationSchema = new StringDeserializationSchema(); + DeserializationSchema<String> valueDeserializationSchema = + new StringDeserializationSchema(); + + try (final UpsertTestSinkWriter<Tuple2<String, String>> writer = + new UpsertTestSinkWriter<>( + outputFile, keySerializationSchema, valueSerializationSchema)) { + for (int i = 0; i < 10; i++) { + writer.write(Tuple2.of("Key #" + i, "Value #" + i), null); + } + writer.flush(false); + + Map<String, String> resultMap = + BinaryFileUtil.readRecords( + outputFile, keyDeserializationSchema, valueDeserializationSchema); + + // after flushing the file should contain records + for (int i = 0; i < 10; i++) { + assertThat(resultMap).containsEntry("Key #" + i, "Value #" + i); + } + + for (int i = 0; i < 5; i++) { + writer.write(Tuple2.of("Key #" + i, "Value #" + i * 10), null); + } + } + + Map<String, String> resultMap = + BinaryFileUtil.readRecords( + outputFile, keyDeserializationSchema, valueDeserializationSchema); + + for (int i = 0; i < 10; i++) { + if (i < 5) { + assertThat(resultMap).containsEntry("Key #" + i, "Value #" + i * 10); + } else { + assertThat(resultMap).containsEntry("Key #" + i, "Value #" + i); + } + } + } + + private static class StringDeserializationSchema extends AbstractDeserializationSchema<String> { Review Comment: Nit: Why not use `SimpleStringSchema`? ########## flink-connectors/flink-connector-upsert-test/src/test/java/org/apache/flink/connector/upserttest/table/UpsertTestDynamicTableSinkITCase.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.upserttest.table; + +import org.apache.flink.connector.upserttest.sink.BinaryFileUtil; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.util.TestLoggerExtension; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** IT Tests for {@link UpsertTestDynamicTableSink}. */ +@ExtendWith(TestLoggerExtension.class) +class UpsertTestDynamicTableSinkITCase { Review Comment: Please use the `MiniClusterExtension` to prevent weird submission bugs. I also wonder why the arch Unit tests pass. I guess they are missing in this module completely ;) ########## flink-connectors/flink-connector-upsert-test/src/main/java/org/apache/flink/connector/upserttest/sink/BinaryFileUtil.java: ########## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.upserttest.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Collection uf utility methods for tests using the {@link UpsertTestSink}. */ +@Internal +public class BinaryFileUtil { Review Comment: I think this should be a `SimpleVersionedSerializer` since that is the main entry for these kinds of operations in the Flink code base. For reading/writing many records you can take a look at https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java ########## flink-connectors/flink-connector-upsert-test/src/main/java/org/apache/flink/connector/upserttest/table/UpsertTestDynamicTableSink.java: ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.upserttest.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.upserttest.sink.UpsertTestSink; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkV2Provider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; + +import java.io.File; +import java.util.Objects; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link DynamicTableSink} that describes how to create a {@link UpsertTestSink} from a logical + * description. + */ +@Internal +class UpsertTestDynamicTableSink implements DynamicTableSink { + + private final DataType physicalRowDataType; + private final EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat; + private final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat; + private final String outputFilePath; + + UpsertTestDynamicTableSink( + DataType physicalRowDataType, + EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat, + EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat, + String outputFilePath) { + this.physicalRowDataType = checkNotNull(physicalRowDataType); + this.keyEncodingFormat = checkNotNull(keyEncodingFormat); + this.valueEncodingFormat = checkNotNull(valueEncodingFormat); + this.outputFilePath = checkNotNull(outputFilePath); + } + + @Override + public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { + ChangelogMode.Builder builder = ChangelogMode.newBuilder(); + for (RowKind kind : requestedMode.getContainedKinds()) { + if (kind != RowKind.UPDATE_BEFORE) { + builder.addContainedKind(kind); + } + } + return builder.build(); + } + + @Override + public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + final File outputFile = new File(outputFilePath); + final SerializationSchema<RowData> keySerialization = + keyEncodingFormat.createRuntimeEncoder(context, physicalRowDataType); + final SerializationSchema<RowData> valueSerialization = + valueEncodingFormat.createRuntimeEncoder(context, physicalRowDataType); + + final UpsertTestSink<RowData> sink = + UpsertTestSink.<RowData>builder() + .setOutputFile(outputFile) + .setKeySerializationSchema(keySerialization) + .setValueSerializationSchema(valueSerialization) + .build(); + return SinkV2Provider.of(sink, 1); + } + + @Override + public DynamicTableSink copy() { + return new UpsertTestDynamicTableSink( + physicalRowDataType, keyEncodingFormat, valueEncodingFormat, outputFilePath); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + UpsertTestDynamicTableSink that = (UpsertTestDynamicTableSink) o; + return Objects.equals(physicalRowDataType, that.physicalRowDataType) + && Objects.equals(keyEncodingFormat, that.keyEncodingFormat) + && Objects.equals(valueEncodingFormat, that.valueEncodingFormat) + && Objects.equals(outputFilePath, that.outputFilePath); + } + + @Override + public int hashCode() { + return Objects.hash( + physicalRowDataType, keyEncodingFormat, valueEncodingFormat, outputFilePath); + } + + @Override + public String asSummaryString() { + return "UpsertFileSink"; Review Comment: Nit: ```suggestion return "UpsertTestFileSink"; ``` ########## flink-connectors/flink-connector-upsert-test/src/test/java/org/apache/flink/connector/upserttest/sink/UpsertTestSinkWriterITCase.java: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.upserttest.sink; + +import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.TestLoggerExtension; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link UpsertTestSinkWriter}. */ +@ExtendWith(TestLoggerExtension.class) +class UpsertTestSinkWriterITCase { + + @Test + public void testWrite(@TempDir File tempDir) throws Exception { + File outputFile = new File(tempDir, "records.out"); + SerializationSchema<Tuple2<String, String>> keySerializationSchema = + element -> element.f0.getBytes(); + SerializationSchema<Tuple2<String, String>> valueSerializationSchema = + element -> element.f1.getBytes(); + + try (final UpsertTestSinkWriter<Tuple2<String, String>> writer = + new UpsertTestSinkWriter<>( + outputFile, keySerializationSchema, valueSerializationSchema)) { + for (int i = 0; i < 10; i++) { + writer.write(Tuple2.of("Key #" + i, "Value #" + i), null); + } + } + + DeserializationSchema<String> keyDeserializationSchema = new StringDeserializationSchema(); + DeserializationSchema<String> valueDeserializationSchema = + new StringDeserializationSchema(); + + Map<String, String> resultMap = + BinaryFileUtil.readRecords( + outputFile, keyDeserializationSchema, valueDeserializationSchema); + + for (int i = 0; i < 10; i++) { + assertThat(resultMap).containsEntry("Key #" + i, "Value #" + i); + } + } + + @Test + public void testWriteOnCheckpoint(@TempDir File tempDir) throws Exception { Review Comment: Can you extract the common code for both tests? In the end, the setup and assertion are equal the only difference is in one scenario you verify that on closing all records are flushed and in the other, you are verifying the behavior on flush. It might also make sense to only have one parameterized test based on the operation of the `UpsertTestSinkWriter`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
