openinx commented on a change in pull request #1145: URL: https://github.com/apache/iceberg/pull/1145#discussion_r449967218
########## File path: flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java ########## @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.flink.data.FlinkParquetWriters; +import org.apache.iceberg.flink.writer.FileAppenderFactory; +import org.apache.iceberg.flink.writer.OutputFileFactory; +import org.apache.iceberg.flink.writer.TaskWriter; +import org.apache.iceberg.flink.writer.TaskWriterFactory; +import org.apache.iceberg.hadoop.SerializableConfiguration; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.PropertyUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; + +class IcebergStreamWriter extends AbstractStreamOperator<SerializableDataFile> + implements OneInputStreamOperator<Row, SerializableDataFile> { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(IcebergStreamWriter.class); + + private final String tablePath; + private final SerializableConfiguration conf; + private Schema readSchema; + + private transient Table table; + private transient TaskWriter<Row> writer; + private transient int subTaskId; + + /** + * Be careful to do the initialization in this constructor, because in {@link DataStream#addSink(SinkFunction)} + * it will call {@link ClosureCleaner#clean(Object, ExecutionConfig.ClosureCleanerLevel, boolean)} to set all the + * non-serializable members to be null. + * + * @param tablePath The base path of the iceberg table. + * @param readSchema The schema of source data. + * @param conf The hadoop's configuration. + */ + private IcebergStreamWriter(String tablePath, Schema readSchema, Configuration conf) { + this.tablePath = tablePath; + this.conf = new SerializableConfiguration(conf); + this.readSchema = readSchema; + } + + @Override + public void open() { + this.table = TableUtil.findTable(tablePath, conf.get()); Review comment: Fine, let's keep the rule. ########## File path: core/src/main/java/org/apache/iceberg/BaseFile.java ########## @@ -360,7 +360,7 @@ public ByteBuffer keyMetadata() { if (list != null) { List<E> copy = Lists.newArrayListWithExpectedSize(list.size()); copy.addAll(list); - return Collections.unmodifiableList(copy); Review comment: OK, I did know theres' a way to specify the customized serializer. Let me check how to handle this. ########## File path: flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java ########## @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.flink.data.FlinkParquetWriters; +import org.apache.iceberg.flink.writer.FileAppenderFactory; +import org.apache.iceberg.flink.writer.OutputFileFactory; +import org.apache.iceberg.flink.writer.TaskWriter; +import org.apache.iceberg.flink.writer.TaskWriterFactory; +import org.apache.iceberg.hadoop.SerializableConfiguration; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.PropertyUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; + +class IcebergStreamWriter extends AbstractStreamOperator<SerializableDataFile> + implements OneInputStreamOperator<Row, SerializableDataFile> { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(IcebergStreamWriter.class); + + private final String tablePath; + private final SerializableConfiguration conf; + private Schema readSchema; + + private transient Table table; + private transient TaskWriter<Row> writer; + private transient int subTaskId; + + /** + * Be careful to do the initialization in this constructor, because in {@link DataStream#addSink(SinkFunction)} + * it will call {@link ClosureCleaner#clean(Object, ExecutionConfig.ClosureCleanerLevel, boolean)} to set all the + * non-serializable members to be null. + * + * @param tablePath The base path of the iceberg table. + * @param readSchema The schema of source data. + * @param conf The hadoop's configuration. + */ + private IcebergStreamWriter(String tablePath, Schema readSchema, Configuration conf) { + this.tablePath = tablePath; + this.conf = new SerializableConfiguration(conf); + this.readSchema = readSchema; + } + + @Override + public void open() { + this.table = TableUtil.findTable(tablePath, conf.get()); + if (this.readSchema != null) { + // reassign ids to match the existing table schema + readSchema = TypeUtil.reassignIds(readSchema, table.schema()); + TypeUtil.validateWriteSchema(readSchema, table.schema(), true, true); + } + + this.subTaskId = getRuntimeContext().getIndexOfThisSubtask(); + + // Initialize the task writer. + FileFormat fileFormat = getFileFormat(); + FileAppenderFactory<Row> appenderFactory = new FlinkFileAppenderFactory(table); + OutputFileFactory outputFileFactory = new OutputFileFactory(table, fileFormat, subTaskId); + this.writer = TaskWriterFactory.createTaskWriter(table.spec(), + appenderFactory, + outputFileFactory, + getTargetFileSizeBytes(), + fileFormat); + } + + @Override + public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { + LOG.info("Iceberg writer {} subtask {} begin preparing for checkpoint {}", tablePath, subTaskId, checkpointId); + // close all open files and emit files to downstream committer operator + writer.close(); + for (DataFile dataFile : writer.getCompleteFiles()) { + emit(dataFile); + } + // Remember to clear the writer's cached complete files. + writer.reset(); + LOG.info("Iceberg writer {} subtask {} completed preparing for checkpoint {}", tablePath, subTaskId, checkpointId); + } + + @Override + public void processElement(StreamRecord<Row> element) throws Exception { + Row value = element.getValue(); + writer.append(value); + + // Emit the data file entries to downstream committer operator if there exist any complete files. + List<DataFile> completeFiles = writer.getCompleteFiles(); + if (!completeFiles.isEmpty()) { + completeFiles.forEach(this::emit); + // Remember to clear the writer's cached complete files. + writer.reset(); + } + } + + @Override + public void close() throws Exception { + if (writer != null) { + writer.close(); + writer = null; + } + } + + private void emit(DataFile dataFile) { + output.collect(new StreamRecord<>(new SerializableDataFile(dataFile))); + } + + private FileFormat getFileFormat() { Review comment: Here we don't create a file format, just parse the `FileFormat` from properties. the `get` sounds good to me. ########## File path: flink/src/main/java/org/apache/iceberg/flink/writer/BaseTaskWriter.java ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.writer; + +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.io.FileAppender; + +abstract class BaseTaskWriter<T> implements TaskWriter<T> { + + protected DataFile closeFileAppender(FileAppender<T> fileAppender, EncryptedOutputFile currentFile, + PartitionSpec spec, StructLike partitionKey) throws IOException { + // Close the file appender firstly. + fileAppender.close(); + + // metrics are only valid after the appender is closed. + Metrics metrics = fileAppender.metrics(); + long fileSizeInBytes = fileAppender.length(); + List<Long> splitOffsets = fileAppender.splitOffsets(); + + // Construct the DataFile and add it into the completeDataFiles. Review comment: It's true, let remove it. ########## File path: flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java ########## @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.flink.data.FlinkParquetWriters; +import org.apache.iceberg.flink.writer.FileAppenderFactory; +import org.apache.iceberg.flink.writer.OutputFileFactory; +import org.apache.iceberg.flink.writer.TaskWriter; +import org.apache.iceberg.flink.writer.TaskWriterFactory; +import org.apache.iceberg.hadoop.SerializableConfiguration; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.PropertyUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; + +class IcebergStreamWriter extends AbstractStreamOperator<SerializableDataFile> + implements OneInputStreamOperator<Row, SerializableDataFile> { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(IcebergStreamWriter.class); + + private final String tablePath; + private final SerializableConfiguration conf; + private Schema readSchema; + + private transient Table table; + private transient TaskWriter<Row> writer; + private transient int subTaskId; + + /** + * Be careful to do the initialization in this constructor, because in {@link DataStream#addSink(SinkFunction)} + * it will call {@link ClosureCleaner#clean(Object, ExecutionConfig.ClosureCleanerLevel, boolean)} to set all the + * non-serializable members to be null. + * + * @param tablePath The base path of the iceberg table. + * @param readSchema The schema of source data. + * @param conf The hadoop's configuration. + */ + private IcebergStreamWriter(String tablePath, Schema readSchema, Configuration conf) { + this.tablePath = tablePath; + this.conf = new SerializableConfiguration(conf); + this.readSchema = readSchema; + } + + @Override + public void open() { + this.table = TableUtil.findTable(tablePath, conf.get()); + if (this.readSchema != null) { + // reassign ids to match the existing table schema + readSchema = TypeUtil.reassignIds(readSchema, table.schema()); Review comment: Because the `readSchema` was converted from user-provided flink table schema, its field id(s) is not matching the written iceberg table's. Then the following `validateWriteSchema` will regard those two tables as incompatible because of the diff id. for example: ``` Cannot write incompatible dataset to table with schema: table { 0: id: optional int 1: data: optional string } write schema:table { 1: id: optional int 2: data: optional string } Problems: * data: int cannot be promoted to string java.lang.IllegalArgumentException: Cannot write incompatible dataset to table with schema: table { 0: id: optional int 1: data: optional string } write schema:table { 1: id: optional int 2: data: optional string } Problems: * data: int cannot be promoted to string at org.apache.iceberg.types.TypeUtil.validateWriteSchema(TypeUtil.java:216) at org.apache.iceberg.flink.IcebergStreamWriter.open(IcebergStreamWriter.java:97) at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:558) at org.apache.iceberg.flink.TestIcebergStreamWriter.testWritingTable(TestIcebergStreamWriter.java:94) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55) at java.lang.Thread.run(Thread.java:748) ``` ########## File path: flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java ########## @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.flink.data.FlinkParquetWriters; +import org.apache.iceberg.flink.writer.FileAppenderFactory; +import org.apache.iceberg.flink.writer.OutputFileFactory; +import org.apache.iceberg.flink.writer.TaskWriter; +import org.apache.iceberg.flink.writer.TaskWriterFactory; +import org.apache.iceberg.hadoop.SerializableConfiguration; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.PropertyUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; + +class IcebergStreamWriter extends AbstractStreamOperator<SerializableDataFile> + implements OneInputStreamOperator<Row, SerializableDataFile> { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(IcebergStreamWriter.class); + + private final String tablePath; + private final SerializableConfiguration conf; + private Schema readSchema; + + private transient Table table; + private transient TaskWriter<Row> writer; + private transient int subTaskId; + + /** + * Be careful to do the initialization in this constructor, because in {@link DataStream#addSink(SinkFunction)} Review comment: Yes, It's a code comment and we expect that the `IcebergStreamWriter` won't be exposed to the upper layer iceberg users, because we will have a wrapper to hidden the implementation details of `IcebergStreamWriter` and `IcebergFilesCommitter` and only expose the `DataStream<Row>` to the end user. I can remove this comment if you guys think it's OK. NOTICE: there will be a next patch to implement the `IcebergFilesCommitter` to collect all the data files emitted by `IcebergStreamWriter` and commit the iceberg transaction in one parallelism. ########## File path: flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java ########## @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.flink.data.FlinkParquetWriters; +import org.apache.iceberg.flink.writer.FileAppenderFactory; +import org.apache.iceberg.flink.writer.OutputFileFactory; +import org.apache.iceberg.flink.writer.TaskWriter; +import org.apache.iceberg.flink.writer.TaskWriterFactory; +import org.apache.iceberg.hadoop.SerializableConfiguration; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.PropertyUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; + +class IcebergStreamWriter extends AbstractStreamOperator<SerializableDataFile> + implements OneInputStreamOperator<Row, SerializableDataFile> { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(IcebergStreamWriter.class); + + private final String tablePath; + private final SerializableConfiguration conf; + private Schema readSchema; + + private transient Table table; + private transient TaskWriter<Row> writer; + private transient int subTaskId; + + /** + * Be careful to do the initialization in this constructor, because in {@link DataStream#addSink(SinkFunction)} + * it will call {@link ClosureCleaner#clean(Object, ExecutionConfig.ClosureCleanerLevel, boolean)} to set all the + * non-serializable members to be null. + * + * @param tablePath The base path of the iceberg table. + * @param readSchema The schema of source data. + * @param conf The hadoop's configuration. + */ + private IcebergStreamWriter(String tablePath, Schema readSchema, Configuration conf) { + this.tablePath = tablePath; + this.conf = new SerializableConfiguration(conf); + this.readSchema = readSchema; + } + + @Override + public void open() { + this.table = TableUtil.findTable(tablePath, conf.get()); + if (this.readSchema != null) { + // reassign ids to match the existing table schema + readSchema = TypeUtil.reassignIds(readSchema, table.schema()); + TypeUtil.validateWriteSchema(readSchema, table.schema(), true, true); + } + + this.subTaskId = getRuntimeContext().getIndexOfThisSubtask(); + + // Initialize the task writer. + FileFormat fileFormat = getFileFormat(); + FileAppenderFactory<Row> appenderFactory = new FlinkFileAppenderFactory(table); + OutputFileFactory outputFileFactory = new OutputFileFactory(table, fileFormat, subTaskId); + this.writer = TaskWriterFactory.createTaskWriter(table.spec(), + appenderFactory, + outputFileFactory, + getTargetFileSizeBytes(), + fileFormat); + } + + @Override + public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { + LOG.info("Iceberg writer {} subtask {} begin preparing for checkpoint {}", tablePath, subTaskId, checkpointId); + // close all open files and emit files to downstream committer operator + writer.close(); + for (DataFile dataFile : writer.getCompleteFiles()) { + emit(dataFile); + } + // Remember to clear the writer's cached complete files. + writer.reset(); + LOG.info("Iceberg writer {} subtask {} completed preparing for checkpoint {}", tablePath, subTaskId, checkpointId); + } + + @Override + public void processElement(StreamRecord<Row> element) throws Exception { + Row value = element.getValue(); + writer.append(value); + + // Emit the data file entries to downstream committer operator if there exist any complete files. + List<DataFile> completeFiles = writer.getCompleteFiles(); + if (!completeFiles.isEmpty()) { + completeFiles.forEach(this::emit); + // Remember to clear the writer's cached complete files. + writer.reset(); + } + } + + @Override + public void close() throws Exception { + if (writer != null) { + writer.close(); + writer = null; + } + } + + private void emit(DataFile dataFile) { + output.collect(new StreamRecord<>(new SerializableDataFile(dataFile))); + } + + private FileFormat getFileFormat() { Review comment: Well, let me consider about this. ########## File path: flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java ########## @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.flink.data.FlinkParquetWriters; +import org.apache.iceberg.flink.writer.FileAppenderFactory; +import org.apache.iceberg.flink.writer.OutputFileFactory; +import org.apache.iceberg.flink.writer.TaskWriter; +import org.apache.iceberg.flink.writer.TaskWriterFactory; +import org.apache.iceberg.hadoop.SerializableConfiguration; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.PropertyUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; + +class IcebergStreamWriter extends AbstractStreamOperator<SerializableDataFile> + implements OneInputStreamOperator<Row, SerializableDataFile> { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(IcebergStreamWriter.class); + + private final String tablePath; + private final SerializableConfiguration conf; + private Schema readSchema; + + private transient Table table; + private transient TaskWriter<Row> writer; + private transient int subTaskId; + + /** + * Be careful to do the initialization in this constructor, because in {@link DataStream#addSink(SinkFunction)} + * it will call {@link ClosureCleaner#clean(Object, ExecutionConfig.ClosureCleanerLevel, boolean)} to set all the + * non-serializable members to be null. + * + * @param tablePath The base path of the iceberg table. + * @param readSchema The schema of source data. + * @param conf The hadoop's configuration. + */ + private IcebergStreamWriter(String tablePath, Schema readSchema, Configuration conf) { + this.tablePath = tablePath; + this.conf = new SerializableConfiguration(conf); + this.readSchema = readSchema; + } + + @Override + public void open() { + this.table = TableUtil.findTable(tablePath, conf.get()); + if (this.readSchema != null) { + // reassign ids to match the existing table schema + readSchema = TypeUtil.reassignIds(readSchema, table.schema()); + TypeUtil.validateWriteSchema(readSchema, table.schema(), true, true); + } + + this.subTaskId = getRuntimeContext().getIndexOfThisSubtask(); + + // Initialize the task writer. + FileFormat fileFormat = getFileFormat(); + FileAppenderFactory<Row> appenderFactory = new FlinkFileAppenderFactory(table); + OutputFileFactory outputFileFactory = new OutputFileFactory(table, fileFormat, subTaskId); + this.writer = TaskWriterFactory.createTaskWriter(table.spec(), + appenderFactory, + outputFileFactory, + getTargetFileSizeBytes(), + fileFormat); + } + + @Override + public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { + LOG.info("Iceberg writer {} subtask {} begin preparing for checkpoint {}", tablePath, subTaskId, checkpointId); + // close all open files and emit files to downstream committer operator + writer.close(); + for (DataFile dataFile : writer.getCompleteFiles()) { Review comment: Well, sounds good to me. ########## File path: flink/src/main/java/org/apache/iceberg/flink/PartitionKey.java ########## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.flink.types.Row; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.transforms.Transform; +import org.apache.iceberg.types.Types; + +public class PartitionKey implements StructLike { + + private final Object[] partitionTuple; + + private PartitionKey(Object[] partitionTuple) { + this.partitionTuple = partitionTuple; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof PartitionKey)) { + return false; + } + + PartitionKey that = (PartitionKey) o; + return Arrays.equals(partitionTuple, that.partitionTuple); + } + + @Override + public int hashCode() { + return Arrays.hashCode(partitionTuple); + } + + @Override + public int size() { + return partitionTuple.length; + } + + @Override + public <T> T get(int pos, Class<T> javaClass) { + return javaClass.cast(partitionTuple[pos]); + } + + public Object[] getPartitionTuple() { + return partitionTuple; + } + + @Override + public <T> void set(int pos, T value) { + partitionTuple[pos] = value; + } + + private static Map<Integer, Integer> buildFieldId2PosMap(Schema schema) { + Map<Integer, Integer> fieldId2Position = Maps.newHashMap(); + List<Types.NestedField> nestedFields = schema.asStruct().fields(); + for (int i = 0; i < nestedFields.size(); i++) { + fieldId2Position.put(nestedFields.get(i).fieldId(), i); + } + return fieldId2Position; + } + + public static Builder builder(PartitionSpec spec) { + return new Builder(spec); + } + + public static class Builder { + private final int size; + + private final int[] pos; + private final Transform[] transforms; + + private Builder(PartitionSpec spec) { + List<PartitionField> fields = spec.fields(); + this.size = fields.size(); + this.pos = new int[size]; + this.transforms = new Transform[size]; + + Map<Integer, Integer> fieldId2Pos = buildFieldId2PosMap(spec.schema()); Review comment: In my thought, the `PartitionSpec` will only use the root-level fields so I simplified the `accessor` to `buildFieldId2PosMap`... I'm not quite sure whether we need the complex tree-traverse, let me take a deeper look.. ########## File path: flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java ########## @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.flink.data.FlinkParquetWriters; +import org.apache.iceberg.flink.writer.FileAppenderFactory; +import org.apache.iceberg.flink.writer.OutputFileFactory; +import org.apache.iceberg.flink.writer.TaskWriter; +import org.apache.iceberg.flink.writer.TaskWriterFactory; +import org.apache.iceberg.hadoop.SerializableConfiguration; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.PropertyUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; + +class IcebergStreamWriter extends AbstractStreamOperator<SerializableDataFile> + implements OneInputStreamOperator<Row, SerializableDataFile> { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(IcebergStreamWriter.class); + + private final String tablePath; + private final SerializableConfiguration conf; + private Schema readSchema; + + private transient Table table; + private transient TaskWriter<Row> writer; + private transient int subTaskId; + + /** + * Be careful to do the initialization in this constructor, because in {@link DataStream#addSink(SinkFunction)} + * it will call {@link ClosureCleaner#clean(Object, ExecutionConfig.ClosureCleanerLevel, boolean)} to set all the + * non-serializable members to be null. + * + * @param tablePath The base path of the iceberg table. + * @param readSchema The schema of source data. + * @param conf The hadoop's configuration. + */ + private IcebergStreamWriter(String tablePath, Schema readSchema, Configuration conf) { + this.tablePath = tablePath; + this.conf = new SerializableConfiguration(conf); + this.readSchema = readSchema; + } + + @Override + public void open() { + this.table = TableUtil.findTable(tablePath, conf.get()); + if (this.readSchema != null) { + // reassign ids to match the existing table schema + readSchema = TypeUtil.reassignIds(readSchema, table.schema()); + TypeUtil.validateWriteSchema(readSchema, table.schema(), true, true); + } + + this.subTaskId = getRuntimeContext().getIndexOfThisSubtask(); + + // Initialize the task writer. + FileFormat fileFormat = getFileFormat(); + FileAppenderFactory<Row> appenderFactory = new FlinkFileAppenderFactory(table); + OutputFileFactory outputFileFactory = new OutputFileFactory(table, fileFormat, subTaskId); + this.writer = TaskWriterFactory.createTaskWriter(table.spec(), + appenderFactory, + outputFileFactory, + getTargetFileSizeBytes(), + fileFormat); + } + + @Override + public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { + LOG.info("Iceberg writer {} subtask {} begin preparing for checkpoint {}", tablePath, subTaskId, checkpointId); + // close all open files and emit files to downstream committer operator + writer.close(); + for (DataFile dataFile : writer.getCompleteFiles()) { + emit(dataFile); + } + // Remember to clear the writer's cached complete files. + writer.reset(); + LOG.info("Iceberg writer {} subtask {} completed preparing for checkpoint {}", tablePath, subTaskId, checkpointId); + } + + @Override + public void processElement(StreamRecord<Row> element) throws Exception { + Row value = element.getValue(); + writer.append(value); + + // Emit the data file entries to downstream committer operator if there exist any complete files. + List<DataFile> completeFiles = writer.getCompleteFiles(); + if (!completeFiles.isEmpty()) { + completeFiles.forEach(this::emit); + // Remember to clear the writer's cached complete files. + writer.reset(); + } + } + + @Override + public void close() throws Exception { + if (writer != null) { + writer.close(); Review comment: @rdblue , I guess the real question you are curious is: here we close the writer without emitting the data files to downstream, then will it break the `exactly-once` semantic ? Let's take an example, we have the record stream: ``` 1 2 3 4 5 6 7 8 9 10 11 12 ... ``` and the emitted data file list would be: ``` datafile0: 1,2,3 datafile1: 4,5,6 datafile2: 7,8,9 datafile3: 10,11,12 ``` Assuming that the checkpoint happen between 6 and 7, then we will emit the `datafile0` and `datafile1` to downstream operator. If we close the writer (exceptionally or intentionally) after record 11, then we will emit the `datafile2` to downstream operator, but will ignore the opening `datafile3`. It actually don't break the `exactly-once` semantics because the snapshot think that the record `[1,6]` has been processed by `IcebergStreamWriter` and snapshot state of the downstream operator contains the data file `datafile0, datafile1`. Once recovered, we will replay the records 7, 8, 9, 10, 11, 12..., the datafile-2 and datafile-3 will be re-generated and re-emitted to downstream operator, so here we don't need to emit the data file to downstream operator when closing. Besides, even if we emit the `datafile-3` to downstream operator in the `close` method, it will be discarded when recovering. (I provided a unit test to address this case). ########## File path: flink/src/main/java/org/apache/iceberg/flink/TableUtil.java ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.hive.HiveCatalogs; + +class TableUtil { + + private TableUtil() { + } + + static Table findTable(String path, Configuration conf) { Review comment: @JingsongLi seems we could work those things together. I will focus on the streaming writer and keep the simple `findTable` here. and you could provide a pull request in `buildIcebergCatalog` way to integrate iceberg catalog to flink catalog (if you want). ########## File path: flink/src/main/java/org/apache/iceberg/flink/writer/BaseTaskWriter.java ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.writer; + +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.io.FileAppender; + +abstract class BaseTaskWriter<T> implements TaskWriter<T> { + + protected DataFile closeFileAppender(FileAppender<T> fileAppender, EncryptedOutputFile currentFile, + PartitionSpec spec, StructLike partitionKey) throws IOException { + // Close the file appender firstly. + fileAppender.close(); + + // metrics are only valid after the appender is closed. + Metrics metrics = fileAppender.metrics(); + long fileSizeInBytes = fileAppender.length(); + List<Long> splitOffsets = fileAppender.splitOffsets(); + + // Construct the DataFile and add it into the completeDataFiles. + return DataFiles.builder(spec) + .withEncryptedOutputFile(currentFile) + .withPath(currentFile.encryptingOutputFile().location()) Review comment: OK, thanks for the reminding. ########## File path: flink/src/main/java/org/apache/iceberg/flink/writer/BaseTaskWriter.java ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.writer; + +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.io.FileAppender; + +abstract class BaseTaskWriter<T> implements TaskWriter<T> { + + protected DataFile closeFileAppender(FileAppender<T> fileAppender, EncryptedOutputFile currentFile, + PartitionSpec spec, StructLike partitionKey) throws IOException { + // Close the file appender firstly. + fileAppender.close(); + + // metrics are only valid after the appender is closed. + Metrics metrics = fileAppender.metrics(); + long fileSizeInBytes = fileAppender.length(); + List<Long> splitOffsets = fileAppender.splitOffsets(); + + // Construct the DataFile and add it into the completeDataFiles. + return DataFiles.builder(spec) Review comment: We actually have skipped the empty `DataFile` construction in the upper layer, you can see the `PartitionWriter` and `UnpartitionedWriter` (we also have an unit test to address it). So no need to check it here anymore. ########## File path: flink/src/main/java/org/apache/iceberg/flink/writer/BaseTaskWriter.java ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.writer; + +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.io.FileAppender; + +abstract class BaseTaskWriter<T> implements TaskWriter<T> { + + protected DataFile closeFileAppender(FileAppender<T> fileAppender, EncryptedOutputFile currentFile, + PartitionSpec spec, StructLike partitionKey) throws IOException { + // Close the file appender firstly. + fileAppender.close(); + + // metrics are only valid after the appender is closed. + Metrics metrics = fileAppender.metrics(); + long fileSizeInBytes = fileAppender.length(); + List<Long> splitOffsets = fileAppender.splitOffsets(); + + // Construct the DataFile and add it into the completeDataFiles. + return DataFiles.builder(spec) + .withEncryptedOutputFile(currentFile) + .withPath(currentFile.encryptingOutputFile().location()) + .withFileSizeInBytes(fileSizeInBytes) + .withPartition(spec.fields().size() == 0 ? null : partitionKey) Review comment: It is not necessary to keep the `partitionKey` reference when building DataFiles. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
