JingsongLi commented on code in PR #295: URL: https://github.com/apache/flink-table-store/pull/295#discussion_r974827083
########## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueDataRollingFileWriter.java: ########## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.io; + +import org.apache.flink.table.store.file.KeyValue; +import org.apache.flink.table.store.file.data.DataFileMeta; + +import java.util.function.Supplier; + +/** A {@link RollingFileWriter} to write {@link KeyValue}s into several rolling data files. */ +public class KeyValueDataRollingFileWriter extends RollingFileWriter<KeyValue, DataFileMeta> { Review Comment: Do we need to create separate class? I think there is no logical. ########## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/SingleFileWriter.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.table.store.file.io; + +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.utils.FileUtils; +import org.apache.flink.util.IOUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.function.Function; + +/** + * A {@link FileWriter} to produce a single file. + * + * @param <T> type of records to write. + * @param <R> type of result to produce after writing a file. + */ +public abstract class SingleFileWriter<T, R> implements FileWriter<T, R> { + + private static final Logger LOG = LoggerFactory.getLogger(SingleFileWriter.class); + + private final BulkWriter.Factory<RowData> factory; + protected final Path path; + private final Function<T, RowData> converter; + + private long recordCount; + private long length; + protected boolean closed; + + private BulkWriter<RowData> writer; + private FSDataOutputStream out; + + public SingleFileWriter( + BulkWriter.Factory<RowData> factory, Path path, Function<T, RowData> converter) { + this.factory = factory; + this.path = path; + this.converter = converter; + + this.recordCount = 0; + this.length = 0; + this.closed = false; + + this.writer = null; + this.out = null; + } + + public Path path() { + return path; + } + + @Override + public void write(T record) throws IOException { + writeImpl(record); + } + + protected RowData writeImpl(T record) throws IOException { + if (closed) { + throw new RuntimeException("Writer has already closed!"); + } + + try { + if (writer == null) { + writer = createWriter(); + } + RowData rowData = converter.apply(record); + writer.addElement(rowData); + recordCount++; + return rowData; + } catch (Throwable e) { + LOG.warn("Exception occurs when writing file " + path + ". Cleaning up.", e); + abort(); + throw e; + } + } + + @Override + public long recordCount() { + return recordCount; + } + + @Override + public long length() throws IOException { + if (closed) { + return length; + } else { + return out.getPos(); + } + } + + @Override + public void abort() { + IOUtils.closeQuietly(out); + FileUtils.deleteOrWarn(path); + } + + @Override + public void close() throws IOException { + if (closed) { + return; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Closing file " + path); + } + + try { + if (writer != null) { + writer.flush(); + writer.finish(); + } + + if (out != null) { + out.flush(); + length = out.getPos(); + out.close(); + } + } catch (IOException e) { + LOG.warn("Exception occurs when closing file " + path + ". Cleaning up.", e); + abort(); + throw e; + } finally { + closed = true; + } + } + + private BulkWriter<RowData> createWriter() throws IOException { + FileSystem fs = path.getFileSystem(); + out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE); + try { + return factory.create(out); + } catch (Throwable e) { + LOG.warn( + "Failed to open the bulk writer, closing the output stream and throw the error.", + e); + IOUtils.closeQuietly(out); Review Comment: No need to close out here, outside will invoke `abort`. ########## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/SingleFileWriter.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.table.store.file.io; + +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.utils.FileUtils; +import org.apache.flink.util.IOUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.function.Function; + +/** + * A {@link FileWriter} to produce a single file. + * + * @param <T> type of records to write. + * @param <R> type of result to produce after writing a file. + */ +public abstract class SingleFileWriter<T, R> implements FileWriter<T, R> { + + private static final Logger LOG = LoggerFactory.getLogger(SingleFileWriter.class); + + private final BulkWriter.Factory<RowData> factory; + protected final Path path; + private final Function<T, RowData> converter; + + private long recordCount; + private long length; + protected boolean closed; + + private BulkWriter<RowData> writer; + private FSDataOutputStream out; + + public SingleFileWriter( + BulkWriter.Factory<RowData> factory, Path path, Function<T, RowData> converter) { + this.factory = factory; + this.path = path; + this.converter = converter; + + this.recordCount = 0; + this.length = 0; + this.closed = false; + + this.writer = null; + this.out = null; + } + + public Path path() { + return path; + } + + @Override + public void write(T record) throws IOException { + writeImpl(record); + } + + protected RowData writeImpl(T record) throws IOException { + if (closed) { + throw new RuntimeException("Writer has already closed!"); + } + + try { + if (writer == null) { Review Comment: It is a single file writer, the file should be single instead of zero. We can create writer in the constructor. This can avoid various inconsistencies caused by not producing files. ########## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RollingFileWriter.java: ########## @@ -36,36 +38,53 @@ */ public class RollingFileWriter<T, R> implements FileWriter<T, List<R>> { - private final Supplier<? extends FileWriter<T, R>> writerFactory; + private static final Logger LOG = LoggerFactory.getLogger(RollingFileWriter.class); + + private final Supplier<? extends SingleFileWriter<T, R>> writerFactory; private final long targetFileSize; - private final List<FileWriter<T, R>> openedWriters; + private final List<SingleFileWriter<T, R>> openedWriters; private final List<R> results; - private FileWriter<T, R> currentWriter = null; + private SingleFileWriter<T, R> currentWriter = null; private long lengthOfClosedFiles = 0L; private long recordCount = 0; private boolean closed = false; public RollingFileWriter( - Supplier<? extends FileWriter<T, R>> writerFactory, long targetFileSize) { + Supplier<? extends SingleFileWriter<T, R>> writerFactory, long targetFileSize) { this.writerFactory = writerFactory; this.targetFileSize = targetFileSize; this.openedWriters = new ArrayList<>(); this.results = new ArrayList<>(); } + @VisibleForTesting + public long targetFileSize() { + return targetFileSize; + } + @Override public void write(T row) throws IOException { - // Open the current writer if write the first record or roll over happen before. - if (currentWriter == null) { - openCurrentWriter(); - } + try { + // Open the current writer if write the first record or roll over happen before. + if (currentWriter == null) { + openCurrentWriter(); + } - currentWriter.write(row); - recordCount += 1; + currentWriter.write(row); + recordCount += 1; - if (currentWriter.length() >= targetFileSize) { - closeCurrentWriter(); + if (currentWriter.length() >= targetFileSize) { + closeCurrentWriter(); + } + } catch (Throwable e) { + LOG.warn( + "Exception occurs when writing file " + + (currentWriter == null ? null : currentWriter.path()) + + ". Cleaning up.", + e); + abort(); Review Comment: Add document to `FileWriter.abort`, Implementation needs to be reentrant ########## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RollingFileWriter.java: ########## @@ -36,36 +38,53 @@ */ public class RollingFileWriter<T, R> implements FileWriter<T, List<R>> { - private final Supplier<? extends FileWriter<T, R>> writerFactory; + private static final Logger LOG = LoggerFactory.getLogger(RollingFileWriter.class); + + private final Supplier<? extends SingleFileWriter<T, R>> writerFactory; private final long targetFileSize; - private final List<FileWriter<T, R>> openedWriters; + private final List<SingleFileWriter<T, R>> openedWriters; private final List<R> results; - private FileWriter<T, R> currentWriter = null; + private SingleFileWriter<T, R> currentWriter = null; private long lengthOfClosedFiles = 0L; private long recordCount = 0; private boolean closed = false; public RollingFileWriter( - Supplier<? extends FileWriter<T, R>> writerFactory, long targetFileSize) { + Supplier<? extends SingleFileWriter<T, R>> writerFactory, long targetFileSize) { this.writerFactory = writerFactory; this.targetFileSize = targetFileSize; this.openedWriters = new ArrayList<>(); this.results = new ArrayList<>(); } + @VisibleForTesting + public long targetFileSize() { + return targetFileSize; + } + @Override public void write(T row) throws IOException { - // Open the current writer if write the first record or roll over happen before. - if (currentWriter == null) { - openCurrentWriter(); - } + try { + // Open the current writer if write the first record or roll over happen before. + if (currentWriter == null) { + openCurrentWriter(); + } - currentWriter.write(row); - recordCount += 1; + currentWriter.write(row); + recordCount += 1; - if (currentWriter.length() >= targetFileSize) { - closeCurrentWriter(); + if (currentWriter.length() >= targetFileSize) { + closeCurrentWriter(); + } + } catch (Throwable e) { + LOG.warn( + "Exception occurs when writing file " + + (currentWriter == null ? null : currentWriter.path()) + + ". Cleaning up.", + e); + abort(); Review Comment: Add document to `FileWriter.write`, clear file by itself now when exception in write. ########## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java: ########## @@ -150,16 +123,24 @@ public List<DataFileMeta> close() throws Exception { List<DataFileMeta> result = new ArrayList<>(); if (writer != null) { - // Abort this writer to clear uncommitted files. - writer.abort(); - + writer.close(); Review Comment: We don't need to use `close` and return result. We can just make `RecordWriter.close` returns void. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
