waterlx commented on a change in pull request #856: URL: https://github.com/apache/incubator-iceberg/pull/856#discussion_r415588989
########## File path: flink/src/main/java/org/apache/iceberg/flink/connector/sink/IcebergSinkAppender.java ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.connector.sink; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Append Iceberg sink to DataStream + * + * @param <IN> input data type + */ +@SuppressWarnings({"checkstyle:ClassTypeParameterName", "checkstyle:HiddenField"}) +public class IcebergSinkAppender<IN> { + private static final Logger LOG = LoggerFactory.getLogger(IcebergSinkAppender.class); + + private final Table table; + private final Configuration config; + private final String sinkName; + private RecordSerializer<IN> serializer; + private Integer writerParallelism; + + public IcebergSinkAppender(Table table, Configuration config, String sinkName) { + this.table = table; + this.config = config; + this.sinkName = sinkName; + } + + /** + * Required. + * + * @param serializer Serialize input data type to Iceberg {@link Record} + */ + public IcebergSinkAppender<IN> withSerializer(RecordSerializer<IN> serializer) { + this.serializer = serializer; + return this; + } + + /** + * Optional. Explicitly set the parallelism for Iceberg writer operator. + * Otherwise, default job parallelism is used. + */ + public IcebergSinkAppender<IN> withWriterParallelism(Integer writerParallelism) { + this.writerParallelism = writerParallelism; + return this; + } + + /** + * @param dataStream append sink to this DataStream + */ + public DataStreamSink<FlinkDataFile> append(DataStream<IN> dataStream) { + // TODO: need to return? + IcebergWriter<IN> writer = new IcebergWriter<>(table, serializer, config); + IcebergCommitter committer = new IcebergCommitter(table, config); + + final String writerId = sinkName + "-writer"; + SingleOutputStreamOperator<FlinkDataFile> writerStream = dataStream + .transform(writerId, TypeInformation.of(FlinkDataFile.class), writer) // IcebergWriter as stream operator + .uid(writerId); + if (null != writerParallelism && writerParallelism > 0) { + LOG.info("Set Iceberg writer parallelism to {}", writerParallelism); + writerStream.setParallelism(writerParallelism); Review comment: @JingsongLi Got you idea. Do you mean that if user does not set parallelism of writer explicitly, the logic could set it to the parallelism of the input stream of writer? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org