openinx commented on a change in pull request #1213: URL: https://github.com/apache/iceberg/pull/1213#discussion_r460634798
########## File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> { + private final Map<PartitionKey, RollingFileAppender> writers = Maps.newHashMap(); + + public PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory, + OutputFileFactory fileFactory, FileIO io, long targetFileSize) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + } + + /** + * Create a PartitionKey from the values in row. + * <p> + * Any PartitionKey returned by this method can be reused by the implementation. + * + * @param row a data row + */ + protected abstract PartitionKey partition(T row); + + @Override + public void write(T row) throws IOException { + PartitionKey partitionKey = partition(row); + + RollingFileAppender writer = writers.get(partitionKey); + if (writer == null) { + // NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers. + PartitionKey copiedKey = partitionKey.copy(); + writer = new RollingFileAppender(copiedKey); + writers.put(copiedKey, writer); + } + + writer.add(row); + } + + @Override + public void close() throws IOException { + if (!writers.isEmpty()) { + Iterator<RollingFileAppender> iterator = writers.values().iterator(); + while (iterator.hasNext()) { + iterator.next().close(); + // Remove from the writers after closed. + iterator.remove(); Review comment: OK, sounds good. ########## File path: core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java ########## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.Tasks; + +abstract class BaseTaskWriter<T> implements TaskWriter<T> { + private final List<DataFile> completedFiles = Lists.newArrayList(); + private final PartitionSpec spec; + private final FileFormat format; + private final FileAppenderFactory<T> appenderFactory; + private final OutputFileFactory fileFactory; + private final FileIO io; + private final long targetFileSize; + + protected BaseTaskWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory, + OutputFileFactory fileFactory, FileIO io, long targetFileSize) { + this.spec = spec; + this.format = format; + this.appenderFactory = appenderFactory; + this.fileFactory = fileFactory; + this.io = io; + this.targetFileSize = targetFileSize; + } + + @Override + public void abort() throws IOException { + close(); + + // clean up files created by this writer + Tasks.foreach(completedFiles) + .throwFailureWhenFinished() + .noRetry() + .run(file -> io.deleteFile(file.path().toString())); + } + + @Override + public List<DataFile> complete() throws IOException { + close(); + + if (completedFiles.size() > 0) { + return ImmutableList.copyOf(completedFiles); + } else { + return Collections.emptyList(); + } + } + + class RollingFileAppender implements Closeable { Review comment: Well, sounds great. ########## File path: core/src/main/java/org/apache/iceberg/io/UnpartitionedWriter.java ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.IOException; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; + +public class UnpartitionedWriter<T> extends BaseTaskWriter<T> { + + private RollingFileAppender currentAppender = null; + + public UnpartitionedWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory, + OutputFileFactory fileFactory, FileIO io, long targetFileSize) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + } + + @Override + public void write(T record) throws IOException { + if (currentAppender == null) { + currentAppender = new RollingFileAppender(null); Review comment: I refactor this part because we don't need to initialize any real writer if there's no record come in. Before this patch , it will open a real file writer even if there's no record to write, and in the end we will need to close this useless writer and clean its file. ########## File path: spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java ########## @@ -92,16 +96,17 @@ private TaskResult rewriteDataForTask(CombinedScanTask task) throws Exception { RowDataReader dataReader = new RowDataReader( task, schema, schema, nameMapping, io.value(), encryptionManager.value(), caseSensitive); - SparkAppenderFactory appenderFactory = new SparkAppenderFactory( - properties, schema, SparkSchemaUtil.convert(schema)); + StructType structType = SparkSchemaUtil.convert(schema); + SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, schema, structType); Review comment: OK, let me revert this. ########## File path: flink/src/main/java/org/apache/iceberg/flink/TaskWriterFactory.java ########## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Map; +import org.apache.flink.types.Row; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.flink.data.FlinkAvroWriter; +import org.apache.iceberg.flink.data.FlinkParquetWriters; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.PartitionedFanoutWriter; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.UnpartitionedWriter; +import org.apache.iceberg.parquet.Parquet; + +class TaskWriterFactory { + private TaskWriterFactory() { + } + + static TaskWriter<Row> createTaskWriter(Schema schema, + PartitionSpec spec, + FileFormat format, + FileAppenderFactory<Row> appenderFactory, + OutputFileFactory fileFactory, + FileIO io, + long targetFileSizeBytes) { + if (spec.fields().isEmpty()) { + return new UnpartitionedWriter<>(spec, format, appenderFactory, fileFactory, io, targetFileSizeBytes); + } else { + return new RowPartitionedFanoutWriter(spec, format, appenderFactory, fileFactory, + io, targetFileSizeBytes, schema); + } + } + + private static class RowPartitionedFanoutWriter extends PartitionedFanoutWriter<Row> { + + private final PartitionKey partitionKey; + private final RowWrapper rowWrapper; + + RowPartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<Row> appenderFactory, + OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema schema) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + this.partitionKey = new PartitionKey(spec, schema); + this.rowWrapper = new RowWrapper(schema.asStruct()); + } + + @Override + protected PartitionKey partition(Row row) { + partitionKey.partition(rowWrapper.wrap(row)); + return partitionKey; + } + } + + static class FlinkFileAppenderFactory implements FileAppenderFactory<Row> { + private final Schema schema; + private final Map<String, String> props; + + FlinkFileAppenderFactory(Schema schema, Map<String, String> props) { + this.schema = schema; + this.props = props; + } + + @Override + public FileAppender<Row> newAppender(OutputFile outputFile, FileFormat format) { Review comment: I see those RP(s), I'd prefer to keep the current version so that we could introduce the flink unit test to address this big change. Changing it to RowData should be easy in future I think. ########## File path: spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java ########## @@ -250,33 +253,42 @@ public String toString() { if (spec.fields().isEmpty()) { return new Unpartitioned24Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize); } else { - return new Partitioned24Writer( - spec, format, appenderFactory, fileFactory, io.value(), targetFileSize, writeSchema); + return new Partitioned24Writer(spec, format, appenderFactory, fileFactory, io.value(), + targetFileSize, writeSchema, dsSchema); } } } - private static class Unpartitioned24Writer extends UnpartitionedWriter implements DataWriter<InternalRow> { + private static class Unpartitioned24Writer extends UnpartitionedWriter<InternalRow> + implements DataWriter<InternalRow> { Unpartitioned24Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory, OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize) { super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize); } @Override public WriterCommitMessage commit() throws IOException { - return new TaskCommit(complete()); + this.close(); + + List<DataFile> dataFiles = complete(); + return new TaskCommit(new TaskResult(dataFiles)); Review comment: OK ########## File path: core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +public abstract class PartitionedFanoutWriter<T> extends BaseTaskWriter<T> { + private final Map<PartitionKey, RollingFileAppender> writers = Maps.newHashMap(); + + public PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory, + OutputFileFactory fileFactory, FileIO io, long targetFileSize) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + } + + /** + * Create a PartitionKey from the values in row. + * <p> + * Any PartitionKey returned by this method can be reused by the implementation. + * + * @param row a data row + */ + protected abstract PartitionKey partition(T row); + + @Override + public void write(T row) throws IOException { + PartitionKey partitionKey = partition(row); + + RollingFileAppender writer = writers.get(partitionKey); + if (writer == null) { + // NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers. + PartitionKey copiedKey = partitionKey.copy(); + writer = new RollingFileAppender(copiedKey); + writers.put(copiedKey, writer); Review comment: > During scan planning, IIUC, an inclusive projection could possibly match a very large number of rows that might fall outside of the predicate range if the RollingFileAppender for this rarely observed predicate at this Task Manager buffers its data for a very long time before writing (say days or even weeks in a longer running streaming query). You mean the flink streaming reader won't see the buffered data which is still not committed to iceberg table ? Actually, that's exactly the expected behavior. Say we have a data pipeline: ``` (flink-streaming-sink-job-A) -> (iceberg table) -> (flink-streaming-reader-job-B). ``` The upstream `flink-streaming-sink-job-A` will append the records to iceberg table continuously, and commit to the iceberg table if checkpoint happen. we need to guarantee the transaction semantic, so the downstream flink streaming reader could only see the committed iceberg data, the delta data between two contiguous snapshots is the incremental data that the flink streaming reader should consume. ########## File path: core/src/main/java/org/apache/iceberg/io/PartitionedWriter.java ########## @@ -17,41 +17,44 @@ * under the License. */ -package org.apache.iceberg.spark.source; +package org.apache.iceberg.io; import java.io.IOException; import java.util.Set; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.spark.SparkSchemaUtil; -import org.apache.spark.sql.catalyst.InternalRow; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class PartitionedWriter extends BaseWriter { +public abstract class PartitionedWriter<T> extends BaseTaskWriter<T> { private static final Logger LOG = LoggerFactory.getLogger(PartitionedWriter.class); - private final PartitionKey key; - private final InternalRowWrapper wrapper; private final Set<PartitionKey> completedPartitions = Sets.newHashSet(); - PartitionedWriter(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory, - OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema writeSchema) { + private PartitionKey currentKey = null; + private RollingFileAppender currentAppender = null; Review comment: Nice finding. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
