alpreu commented on code in PR #19655: URL: https://github.com/apache/flink/pull/19655#discussion_r877948231
########## flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/connector/upserttest/sink/UpsertTestSink.java: ########## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.upserttest.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import static org.apache.flink.connector.upserttest.sink.UpsertTestSinkWriter.MAGIC_BYTE; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Flink Sink to upsert test data into a file. This Sink is intended for testing internal + * functionality and is **not** production-ready. + * + * <p>Please note that the UpsertTestSink needs to run with a parallelism of 1 to function + * correctly. There is currently no support for using multiple writers at once. + * + * @param <IN> type of records written to the file + * @see UpsertTestSinkBuilder on how to construct an UpsertTestSink + */ +@PublicEvolving +public class UpsertTestSink<IN> implements Sink<IN> { + + private final File outputFile; + private final SerializationSchema<IN> keySerializationSchema; + private final SerializationSchema<IN> valueSerializationSchema; + + UpsertTestSink( + File outputFile, + SerializationSchema<IN> keySerializationSchema, + SerializationSchema<IN> valueSerializationSchema) { + this.outputFile = checkNotNull(outputFile); + this.keySerializationSchema = checkNotNull(keySerializationSchema); + this.valueSerializationSchema = checkNotNull(valueSerializationSchema); + } + + /** + * Create a {@link UpsertTestSinkBuilder} to construct a new {@link UpsertTestSink}. + * + * @param <IN> type of incoming records + * @return {@link UpsertTestSinkBuilder} + */ + public static <IN> UpsertTestSinkBuilder<IN> builder() { + return new UpsertTestSinkBuilder<>(); + } + + @Internal + @Override + public SinkWriter<IN> createWriter(InitContext context) { + return new UpsertTestSinkWriter<>( + outputFile, keySerializationSchema, valueSerializationSchema); + } + + ///////////////////////////////////////////////////////////// + // Utilities + ///////////////////////////////////////////////////////////// + + /** + * Returns the total number of records written using the {@link UpsertTestSinkWriter} to the + * given File. + * + * @param bis The BufferedInputStream to read from + * @return the number of records + * @throws IOException + */ + public static int getNumberOfRecords(BufferedInputStream bis) throws IOException { + Map<ImmutableByteArrayWrapper, ImmutableByteArrayWrapper> records = readRecords(bis); + return records.size(); + } + + /** + * Returns the total number of records written using the {@link UpsertTestSinkWriter} to the + * given File. + * + * @param file The File to read from + * @return the number of records + * @throws IOException + */ + public static int getNumberOfRecords(File file) throws IOException { + checkNotNull(file); + FileInputStream fs = new FileInputStream(file); + BufferedInputStream bis = new BufferedInputStream(fs); + return getNumberOfRecords(bis); + } + + /** + * Reads records that were written using the {@link UpsertTestSinkWriter} from the given + * InputStream and converts them using the provided {@link DeserializationSchema}s. + * + * @param bis The BufferedInputStream to read from + * @param keyDeserializationSchema The key's DeserializationSchema + * @param valueDeserializationSchema The value's DeserializationSchema + * @return Map containing the deserialized key-value pairs + * @throws IOException + */ + public static <K, V> Map<K, V> readRecords( + BufferedInputStream bis, + DeserializationSchema<K> keyDeserializationSchema, + DeserializationSchema<V> valueDeserializationSchema) + throws IOException { + checkNotNull(bis); + Map<ImmutableByteArrayWrapper, ImmutableByteArrayWrapper> bytesMap = readRecords(bis); + Map<K, V> typedMap = new HashMap<>(bytesMap.size()); + + Iterator<Map.Entry<ImmutableByteArrayWrapper, ImmutableByteArrayWrapper>> it = + bytesMap.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<ImmutableByteArrayWrapper, ImmutableByteArrayWrapper> entry = it.next(); + K key = keyDeserializationSchema.deserialize(entry.getKey().array()); + V value = valueDeserializationSchema.deserialize(entry.getValue().array()); + typedMap.put(key, value); + } + return typedMap; + } + + /** + * Reads records that were written using the {@link UpsertTestSinkWriter} from the given File + * and converts them using the provided {@link DeserializationSchema}s. + * + * @param file The File to read from + * @param keyDeserializationSchema The key's DeserializationSchema + * @param valueDeserializationSchema The value's DeserializationSchema + * @return Map containing the deserialized key-value pairs + * @throws IOException + */ + public static <K, V> Map<K, V> readRecords( + File file, + DeserializationSchema<K> keyDeserializationSchema, + DeserializationSchema<V> valueDeserializationSchema) + throws IOException { + checkNotNull(file); + FileInputStream fs = new FileInputStream(file); + BufferedInputStream bis = new BufferedInputStream(fs); + return readRecords(bis, keyDeserializationSchema, valueDeserializationSchema); + } + + /** + * Reads records that were written using the {@link UpsertTestSinkWriter} from the given + * InputStream. + * + * @param bis The BufferedInputStream to read from + * @return Map containing the read ImmutableByteArrayWrapper key-value pairs + * @throws IOException + */ + private static Map<ImmutableByteArrayWrapper, ImmutableByteArrayWrapper> readRecords( + BufferedInputStream bis) throws IOException { + checkNotNull(bis); + Map<ImmutableByteArrayWrapper, ImmutableByteArrayWrapper> records = new HashMap<>(); + int magicByte; + while ((magicByte = bis.read()) != -1) { Review Comment: We are reading individual records from the file so we need to iterate until none are left -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
