aokolnychyi commented on code in PR #6335: URL: https://github.com/apache/iceberg/pull/6335#discussion_r1275770078
########## core/src/main/java/org/apache/iceberg/RollingManifestWriter.java: ########## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** As opposed to {@link ManifestWriter}, a rolling writer could produce multiple manifest files. */ +abstract class RollingManifestWriter<T extends ContentFile<T>> implements FileAppender<T> { + private final int formatVersion; + private final Long snapshotId; + private final PartitionSpec spec; + private final FileIO io; + private final ManifestOutputFileFactory outputFileFactory; + private final long targetFileSizeInBytes; + private final List<ManifestFile> manifestFiles; + + private long currentFileRows = 0; + private OutputFile currentFile; + private ManifestWriter<T> currentWriter = null; + + private boolean closed = false; + + RollingManifestWriter( + int formatVersion, + Long snapshotId, + PartitionSpec spec, + FileIO io, + ManifestOutputFileFactory outputFileFactory, + long targetFileSizeInBytes) { + this.formatVersion = formatVersion; + this.snapshotId = snapshotId; + this.spec = spec; + this.io = io; + this.outputFileFactory = outputFileFactory; + this.targetFileSizeInBytes = targetFileSizeInBytes; + this.manifestFiles = Lists.newArrayList(); + + openCurrentWriter(); + } + + protected abstract ManifestWriter<T> newManifestWriter( + int targetFormatVersion, + Long targetSnapshotId, + PartitionSpec targetSpec, + OutputFile targetManifestPath); + + /** + * Add an added entry for a file. + * + * <p>The entry's snapshot ID will be this manifest's snapshot ID. The data and file sequence + * numbers will be assigned at commit. + * + * @param addedFile a data file + */ + @Override + public void add(T addedFile) { + currentWriter.add(addedFile); + tryRollingToNewFile(); + } + + /** + * Add an added entry for a file with a specific sequence number. + * + * <p>The entry's snapshot ID will be this manifest's snapshot ID. The entry's data sequence + * number will be the provided data sequence number. The entry's file sequence number will be + * assigned at commit. + * + * @param addedFile a data file + * @param dataSequenceNumber a data sequence number for the file + */ + public void add(T addedFile, long dataSequenceNumber) { + currentWriter.add(addedFile, dataSequenceNumber); + tryRollingToNewFile(); + } + + /** + * Add an existing entry for a file. + * + * <p>The original data and file sequence numbers, snapshot ID, which were assigned at commit, + * must be preserved when adding an existing entry. + * + * @param existingFile a file + * @param fileSnapshotId snapshot ID when the data file was added to the table + * @param dataSequenceNumber a data sequence number of the file (assigned when the file was added) + * @param fileSequenceNumber a file sequence number (assigned when the file was added) + */ + public void existing( + T existingFile, long fileSnapshotId, long dataSequenceNumber, Long fileSequenceNumber) { + currentWriter.existing(existingFile, fileSnapshotId, dataSequenceNumber, fileSequenceNumber); + tryRollingToNewFile(); + } + + /** + * Add a delete entry for a file. + * + * <p>The entry's snapshot ID will be this manifest's snapshot ID. However, the original data and + * file sequence numbers of the file must be preserved when the file is marked as deleted. + * + * @param deletedFile a file + * @param dataSequenceNumber a data sequence number of the file (assigned when the file was added) + * @param fileSequenceNumber a file sequence number (assigned when the file was added) + */ + public void delete(T deletedFile, long dataSequenceNumber, Long fileSequenceNumber) { + currentWriter.delete(deletedFile, dataSequenceNumber, fileSequenceNumber); + tryRollingToNewFile(); + } + + private void tryRollingToNewFile() { + currentFileRows++; + + if (currentWriter.length() > targetFileSizeInBytes) { Review Comment: Will it be expensive to call for every file? In rolling file writers, we check only every 1000 records. I understand that value may be too big but I still don't think it is a good idea to check the length for every entry. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
