fapaul commented on code in PR #19655: URL: https://github.com/apache/flink/pull/19655#discussion_r877902446
########## flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/connector/upserttest/sink/ImmutableByteArrayWrapper.java: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.upserttest.sink; + +import org.apache.flink.annotation.VisibleForTesting; + +import java.util.Arrays; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class is a slim wrapper around {@code byte[]} that defensively copies the array during + * construction and retrieval to prevent its contents from changing. It also implements a + * content-dependent {@link #hashCode} method to allow usage as keys in a {@link Map}. + */ +class ImmutableByteArrayWrapper { + + @VisibleForTesting final byte[] bytes; + + ImmutableByteArrayWrapper(byte[] bytes) { + checkNotNull(bytes); + this.bytes = bytes.clone(); + } + + /** + * Returns a reference-free copy of the underlying byte[]. + * + * @return the copied byte[] + */ + byte[] array() { + return bytes.clone(); Review Comment: Nit: Is it necessary to `copy` again? ########## flink-test-utils-parent/flink-test-utils/pom.xml: ########## @@ -35,6 +35,12 @@ under the License. <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> + <version>${project.version}</version> + </dependency> Review Comment: I have two questions about this dependency. 1. Why is it defined in the parent pom of the test-utils? 2. I guess we need it to build the sink but can you quickly check if it is really needed. ########## flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/connector/upserttest/sink/UpsertTestSink.java: ########## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.upserttest.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import static org.apache.flink.connector.upserttest.sink.UpsertTestSinkWriter.MAGIC_BYTE; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Flink Sink to upsert test data into a file. This Sink is intended for testing internal + * functionality and is **not** production-ready. + * + * <p>Please note that the UpsertTestSink needs to run with a parallelism of 1 to function + * correctly. There is currently no support for using multiple writers at once. + * + * @param <IN> type of records written to the file + * @see UpsertTestSinkBuilder on how to construct an UpsertTestSink + */ +@PublicEvolving +public class UpsertTestSink<IN> implements Sink<IN> { + + private final File outputFile; + private final SerializationSchema<IN> keySerializationSchema; + private final SerializationSchema<IN> valueSerializationSchema; + + UpsertTestSink( + File outputFile, + SerializationSchema<IN> keySerializationSchema, + SerializationSchema<IN> valueSerializationSchema) { + this.outputFile = checkNotNull(outputFile); + this.keySerializationSchema = checkNotNull(keySerializationSchema); + this.valueSerializationSchema = checkNotNull(valueSerializationSchema); + } + + /** + * Create a {@link UpsertTestSinkBuilder} to construct a new {@link UpsertTestSink}. + * + * @param <IN> type of incoming records + * @return {@link UpsertTestSinkBuilder} + */ + public static <IN> UpsertTestSinkBuilder<IN> builder() { + return new UpsertTestSinkBuilder<>(); + } + + @Internal + @Override + public SinkWriter<IN> createWriter(InitContext context) { + return new UpsertTestSinkWriter<>( + outputFile, keySerializationSchema, valueSerializationSchema); + } + + ///////////////////////////////////////////////////////////// + // Utilities + ///////////////////////////////////////////////////////////// + + /** + * Returns the total number of records written using the {@link UpsertTestSinkWriter} to the + * given File. + * + * @param bis The BufferedInputStream to read from + * @return the number of records + * @throws IOException + */ + public static int getNumberOfRecords(BufferedInputStream bis) throws IOException { + Map<ImmutableByteArrayWrapper, ImmutableByteArrayWrapper> records = readRecords(bis); + return records.size(); + } + + /** + * Returns the total number of records written using the {@link UpsertTestSinkWriter} to the + * given File. + * + * @param file The File to read from + * @return the number of records + * @throws IOException + */ + public static int getNumberOfRecords(File file) throws IOException { + checkNotNull(file); + FileInputStream fs = new FileInputStream(file); + BufferedInputStream bis = new BufferedInputStream(fs); + return getNumberOfRecords(bis); + } + + /** + * Reads records that were written using the {@link UpsertTestSinkWriter} from the given + * InputStream and converts them using the provided {@link DeserializationSchema}s. + * + * @param bis The BufferedInputStream to read from + * @param keyDeserializationSchema The key's DeserializationSchema + * @param valueDeserializationSchema The value's DeserializationSchema + * @return Map containing the deserialized key-value pairs + * @throws IOException + */ + public static <K, V> Map<K, V> readRecords( + BufferedInputStream bis, + DeserializationSchema<K> keyDeserializationSchema, + DeserializationSchema<V> valueDeserializationSchema) + throws IOException { + checkNotNull(bis); + Map<ImmutableByteArrayWrapper, ImmutableByteArrayWrapper> bytesMap = readRecords(bis); + Map<K, V> typedMap = new HashMap<>(bytesMap.size()); + + Iterator<Map.Entry<ImmutableByteArrayWrapper, ImmutableByteArrayWrapper>> it = + bytesMap.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<ImmutableByteArrayWrapper, ImmutableByteArrayWrapper> entry = it.next(); + K key = keyDeserializationSchema.deserialize(entry.getKey().array()); + V value = valueDeserializationSchema.deserialize(entry.getValue().array()); + typedMap.put(key, value); + } + return typedMap; + } + + /** + * Reads records that were written using the {@link UpsertTestSinkWriter} from the given File + * and converts them using the provided {@link DeserializationSchema}s. + * + * @param file The File to read from + * @param keyDeserializationSchema The key's DeserializationSchema + * @param valueDeserializationSchema The value's DeserializationSchema + * @return Map containing the deserialized key-value pairs + * @throws IOException + */ + public static <K, V> Map<K, V> readRecords( + File file, + DeserializationSchema<K> keyDeserializationSchema, + DeserializationSchema<V> valueDeserializationSchema) + throws IOException { + checkNotNull(file); + FileInputStream fs = new FileInputStream(file); + BufferedInputStream bis = new BufferedInputStream(fs); + return readRecords(bis, keyDeserializationSchema, valueDeserializationSchema); + } + + /** + * Reads records that were written using the {@link UpsertTestSinkWriter} from the given + * InputStream. + * + * @param bis The BufferedInputStream to read from + * @return Map containing the read ImmutableByteArrayWrapper key-value pairs + * @throws IOException + */ + private static Map<ImmutableByteArrayWrapper, ImmutableByteArrayWrapper> readRecords( + BufferedInputStream bis) throws IOException { + checkNotNull(bis); + Map<ImmutableByteArrayWrapper, ImmutableByteArrayWrapper> records = new HashMap<>(); + int magicByte; + while ((magicByte = bis.read()) != -1) { Review Comment: Can you explain the `while` loop here? I would have thought just reading the magic bytes followed by the content is enough. ########## flink-tests/src/test/java/org/apache/flink/connector/upserttest/sink/ImmutableByteArrayWrapperTest.java: ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.upserttest.sink; + +import org.apache.flink.util.TestLoggerExtension; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit test for {@link ImmutableByteArrayWrapper}. */ +@ExtendWith(TestLoggerExtension.class) +class ImmutableByteArrayWrapperTest { + + @Test + public void testConstructorCopy() { + byte[] array = "immutability of constructor".getBytes(); + byte[] clonedArray = new ImmutableByteArrayWrapper(array).bytes; + assertCopyIsReferenceFree(array, clonedArray); + } + + @Test + public void testGetterCopy() { + byte[] array = "immutability of getter".getBytes(); + byte[] clonedArray = new ImmutableByteArrayWrapper(array).array(); + assertCopyIsReferenceFree(array, clonedArray); + } + + void assertCopyIsReferenceFree(byte[] original, byte[] clone) { Review Comment: private static? ########## flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/connector/upserttest/sink/UpsertTestSinkWriter.java: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.upserttest.sink; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.util.FlinkRuntimeException; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class is responsible for writing records into a file in an upsert-fashion. On every + * checkpoint each key-value pair currently in the map is written to the file. + * + * @param <IN> The type of the input elements. + */ +class UpsertTestSinkWriter<IN> implements SinkWriter<IN> { + static final byte MAGIC_BYTE = 13; + + private final SerializationSchema<IN> keySerializationSchema; + private final SerializationSchema<IN> valueSerializationSchema; + private final Map<ImmutableByteArrayWrapper, ImmutableByteArrayWrapper> records = + new HashMap<>(); + private final BufferedOutputStream bufferedOutputStream; + + UpsertTestSinkWriter( + File outputFile, + SerializationSchema<IN> keySerializationSchema, + SerializationSchema<IN> valueSerializationSchema) { + this.keySerializationSchema = checkNotNull(keySerializationSchema); + this.valueSerializationSchema = checkNotNull(valueSerializationSchema); + checkNotNull(outputFile); + try { + this.bufferedOutputStream = + new BufferedOutputStream(new FileOutputStream(outputFile, true)); + } catch (FileNotFoundException e) { + throw new FlinkRuntimeException("Could not find file", e); + } + } + + @Override + public void write(IN element, Context context) { + byte[] key = keySerializationSchema.serialize(element); + byte[] value = valueSerializationSchema.serialize(element); + records.put(new ImmutableByteArrayWrapper(key), new ImmutableByteArrayWrapper(value)); + } + + @Override + public void flush(boolean endOfInput) throws IOException { + writeRecords(bufferedOutputStream, records); + records.clear(); + } + + @Override + public void close() throws Exception { + flush(true); + bufferedOutputStream.close(); + } + + private static void writeRecords( Review Comment: Nit: I am not sure if I like the current approach where the writing and reading code is not part of the same class anymore. Sharing the `MAGIC_BYTE` seems already hard to track. ########## flink-tests/src/test/java/org/apache/flink/connector/upserttest/sink/UpsertTestSinkWriterITCase.java: ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.upserttest.sink; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.TestLoggerExtension; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link UpsertTestSinkWriter}. */ +@ExtendWith(TestLoggerExtension.class) +class UpsertTestSinkWriterITCase { + + @RegisterExtension + public static final MiniClusterExtension MINI_CLUSTER_RESOURCE = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(1) + .build()); + + @TempDir private File tempDir; + private File outputFile; + private UpsertTestSinkWriter<Tuple2<String, String>> writer; + private List<Tuple2<String, String>> expectedRecords; + + @BeforeEach + public void setup() { + outputFile = new File(tempDir, "records.out"); + writer = createSinkWriter(outputFile); + expectedRecords = writeTestData(writer); + } + + @AfterEach + public void tearDown() throws Exception { + writer.close(); + } + + @Test + public void testWrite() throws Exception { + writer.close(); + testRecordPresence(outputFile, expectedRecords); + } + + @Test + public void testWriteOnCheckpoint() throws Exception { + writer.flush(false); + testRecordPresence(outputFile, expectedRecords); + } + + UpsertTestSinkWriter<Tuple2<String, String>> createSinkWriter(File outputFile) { + SerializationSchema<Tuple2<String, String>> keySerializationSchema = + element -> element.f0.getBytes(); + SerializationSchema<Tuple2<String, String>> valueSerializationSchema = + element -> element.f1.getBytes(); + + return new UpsertTestSinkWriter<>( + outputFile, keySerializationSchema, valueSerializationSchema); + } + + List<Tuple2<String, String>> writeTestData( + UpsertTestSinkWriter<Tuple2<String, String>> writer) { + final List<Tuple2<String, String>> expectedRecords = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + Tuple2<String, String> record = Tuple2.of("Key #" + i, "Value #" + i); + expectedRecords.add(record); + writer.write(record, null); + } + return expectedRecords; + } + + void testRecordPresence(File outputFile, List<Tuple2<String, String>> expectedRecords) Review Comment: private? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
