JingsongLi commented on code in PR #182:
URL: https://github.com/apache/flink-table-store/pull/182#discussion_r916494299


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManager.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.data;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.store.file.compact.CompactManager;
+import org.apache.flink.table.store.file.compact.CompactTask;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+/** Compact manager for {@link 
org.apache.flink.table.store.file.AppendOnlyFileStore}. */
+public class AppendOnlyCompactManager extends CompactManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AppendOnlyCompactManager.class);
+
+    private final int minFileNum;
+    private final int maxFileNum;
+    private final long targetFileSize;
+    private final CompactRewriter rewriter;
+    private final LinkedList<DataFileMeta> toCompact;
+
+    public AppendOnlyCompactManager(
+            ExecutorService executor,
+            LinkedList<DataFileMeta> toCompact,
+            int minFileNum,
+            int maxFileNum,
+            long targetFileSize,
+            CompactRewriter rewriter) {
+        super(executor);
+        this.toCompact = toCompact;
+        this.maxFileNum = maxFileNum;
+        this.minFileNum = minFileNum;
+        this.targetFileSize = targetFileSize;
+        this.rewriter = rewriter;
+    }
+
+    @Override
+    public void submitCompaction() {
+        if (taskFuture != null) {
+            throw new IllegalStateException(
+                    "Please finish the previous compaction before submitting 
new one.");
+        }
+        pickCompactBefore()
+                .ifPresent(
+                        (before) -> {
+                            if (LOG.isDebugEnabled()) {

Review Comment:
   Can this debug in `CompactTask`?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManager.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.data;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.store.file.compact.CompactManager;
+import org.apache.flink.table.store.file.compact.CompactTask;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+/** Compact manager for {@link 
org.apache.flink.table.store.file.AppendOnlyFileStore}. */
+public class AppendOnlyCompactManager extends CompactManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AppendOnlyCompactManager.class);
+
+    private final int minFileNum;
+    private final int maxFileNum;
+    private final long targetFileSize;
+    private final CompactRewriter rewriter;
+    private final LinkedList<DataFileMeta> toCompact;
+
+    public AppendOnlyCompactManager(
+            ExecutorService executor,
+            LinkedList<DataFileMeta> toCompact,
+            int minFileNum,
+            int maxFileNum,
+            long targetFileSize,
+            CompactRewriter rewriter) {
+        super(executor);
+        this.toCompact = toCompact;
+        this.maxFileNum = maxFileNum;
+        this.minFileNum = minFileNum;
+        this.targetFileSize = targetFileSize;
+        this.rewriter = rewriter;
+    }
+
+    @Override
+    public void submitCompaction() {
+        if (taskFuture != null) {
+            throw new IllegalStateException(
+                    "Please finish the previous compaction before submitting 
new one.");
+        }
+        pickCompactBefore()
+                .ifPresent(
+                        (before) -> {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug(
+                                        "Submit compaction with files (name, 
level, size): "
+                                                + before.stream()
+                                                        .map(
+                                                                file ->
+                                                                        
String.format(
+                                                                               
 "(%s, %d, %d)",
+                                                                               
 file.fileName(),
+                                                                               
 file.level(),
+                                                                               
 file.fileSize()))
+                                                        
.collect(Collectors.joining(", ")));
+                            }
+                            taskFuture =
+                                    executor.submit(
+                                            new CompactTask() {
+                                                @Override
+                                                protected void doCompact() 
throws Exception {
+                                                    
compactBefore.addAll(before);
+                                                    
compactAfter.addAll(rewriter.rewrite(before));
+                                                }
+                                            });
+                        });
+    }
+
+    @VisibleForTesting
+    Optional<List<DataFileMeta>> pickCompactBefore() {
+        long totalFileSize = 0L;
+        int fileNum = 0;
+        LinkedList<DataFileMeta> compactBefore = new LinkedList<>();
+
+        while (!toCompact.isEmpty()) {
+            DataFileMeta file = toCompact.pollFirst();
+            compactBefore.add(file);
+            totalFileSize += file.fileSize();
+            fileNum++;
+            if ((totalFileSize >= targetFileSize && fileNum >= minFileNum)
+                    || fileNum >= maxFileNum) {
+                return Optional.of(compactBefore);
+            } else if (totalFileSize >= targetFileSize) {
+                // left pointer shift one pos to right
+                DataFileMeta removed = compactBefore.pollFirst();
+                assert removed != null;
+                totalFileSize -= removed.fileSize();
+                fileNum--;
+            }
+        }
+        for (DataFileMeta file : compactBefore) {

Review Comment:
   toCompact.addAll(compactBefore)?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactTask.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.compact;
+
+import org.apache.flink.table.store.file.data.DataFileMeta;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+/** Compact task. */
+public abstract class CompactTask implements Callable<CompactResult> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CompactTask.class);
+
+    protected final List<DataFileMeta> compactBefore;
+
+    protected final List<DataFileMeta> compactAfter;
+
+    // metrics
+    private long rewriteInputSize;
+    private long rewriteOutputSize;
+
+    public CompactTask() {
+        this.compactBefore = new ArrayList<>();
+        this.compactAfter = new ArrayList<>();
+        this.rewriteInputSize = 0;
+        this.rewriteOutputSize = 0;
+    }
+
+    @Override
+    public CompactResult call() throws Exception {
+        long startMillis = System.currentTimeMillis();
+        doCompact();

Review Comment:
   I think it is better to introduce a `List doCompact(List inputs)`, this 
allows subclasses to not have to manipulate mutable collection objects.
   An example of what happens if doCompact is executed twice.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java:
##########
@@ -126,31 +150,122 @@ public List<DataFileMeta> close() throws Exception {
         return result;
     }
 
-    private RowRollingWriter createRollingRowWriter() {
-        return new RowRollingWriter(
-                () -> new RowFileWriter(fileWriterFactory, 
pathFactory.newPath()), targetFileSize);
+    private static long getMaxSequenceNumber(List<DataFileMeta> fileMetas) {
+        return fileMetas.stream()
+                .map(DataFileMeta::maxSequenceNumber)
+                .max(Long::compare)
+                .orElse(-1L);
+    }
+
+    private void submitCompaction() {
+        if (compactManager.isCompactionFinished() && !toCompact.isEmpty()) {
+            compactManager.submitCompaction();
+        }
     }
 
-    private class RowRollingWriter extends RollingFileWriter<RowData, 
DataFileMeta> {
+    private void finishCompaction(boolean blocking)
+            throws ExecutionException, InterruptedException {
+        compactManager
+                .finishCompaction(blocking)
+                .ifPresent(
+                        result -> {
+                            compactBefore.addAll(result.before());
+                            compactAfter.addAll(result.after());
+                            if (!result.after().isEmpty()) {
+                                // if the last compacted file is still small,
+                                // add it back to the head
+                                DataFileMeta lastFile =
+                                        
result.after().get(result.after().size() - 1);
+                                if (lastFile.fileSize() < targetFileSize) {
+                                    toCompact.offerFirst(lastFile);
+                                }
+                            }
+                        });
+    }
+
+    private Increment drainIncrement(List<DataFileMeta> newFiles) {
+        Increment increment =
+                new Increment(
+                        newFiles, new ArrayList<>(compactBefore), new 
ArrayList<>(compactAfter));
+        compactBefore.clear();
+        compactAfter.clear();
+        // add new generated files
+        newFiles.forEach(toCompact::offerLast);
+        return increment;
+    }
+
+    @VisibleForTesting
+    List<DataFileMeta> getToCompact() {
+        return toCompact;
+    }
+
+    /** Rolling file writer for append-only table. */
+    public static class RowRollingWriter extends RollingFileWriter<RowData, 
DataFileMeta> {
 
         public RowRollingWriter(Supplier<RowFileWriter> writerFactory, long 
targetFileSize) {
             super(writerFactory, targetFileSize);
         }
+
+        public static RowRollingWriter createRollingRowWriter(
+                long schemaId,
+                FileFormat fileFormat,
+                long targetFileSize,
+                RowType writeSchema,
+                DataFilePathFactory pathFactory,
+                AtomicLong nextSeqNum) {
+            return new RowRollingWriter(
+                    () ->
+                            new RowFileWriter(
+                                    MetricFileWriter.createFactory(
+                                            
fileFormat.createWriterFactory(writeSchema),
+                                            Function.identity(),
+                                            writeSchema,
+                                            fileFormat
+                                                    
.createStatsExtractor(writeSchema)
+                                                    .orElse(null)),
+                                    pathFactory.newPath(),
+                                    writeSchema,
+                                    schemaId,
+                                    nextSeqNum),
+                    targetFileSize);
+        }
+
+        public List<DataFileMeta> write(CloseableIterator<RowData> iterator) 
throws Exception {
+            try {
+                super.write(iterator);
+                super.close();
+                return super.result();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            } finally {
+                iterator.close();
+            }
+        }
     }
 
-    private class RowFileWriter extends BaseFileWriter<RowData, DataFileMeta> {
-        private final long minSeqNum;
+    /** */
+    public static class RowFileWriter extends BaseFileWriter<RowData, 
DataFileMeta> {
 
-        public RowFileWriter(FileWriter.Factory<RowData, Metric> 
writerFactory, Path path) {
+        private final FieldStatsArraySerializer statsArraySerializer;
+        private final long schemaId;
+        private final AtomicLong minSeqNum;
+
+        public RowFileWriter(
+                FileWriter.Factory<RowData, Metric> writerFactory,
+                Path path,
+                RowType writeSchema,
+                long schemaId,
+                AtomicLong minSeqNum) {

Review Comment:
   Can you introduce a `LongCounter`? We don't need a thread safe `AtomicLong`.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java:
##########
@@ -92,22 +108,30 @@ public void write(RowData rowData) throws Exception {
 
     @Override
     public Increment prepareCommit() throws Exception {
-        List<DataFileMeta> newFiles = new ArrayList<>();
+        submitCompaction();

Review Comment:
   Add new Files first, and then `submitCompaction`?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java:
##########
@@ -126,31 +150,122 @@ public List<DataFileMeta> close() throws Exception {
         return result;
     }
 
-    private RowRollingWriter createRollingRowWriter() {
-        return new RowRollingWriter(
-                () -> new RowFileWriter(fileWriterFactory, 
pathFactory.newPath()), targetFileSize);
+    private static long getMaxSequenceNumber(List<DataFileMeta> fileMetas) {
+        return fileMetas.stream()
+                .map(DataFileMeta::maxSequenceNumber)
+                .max(Long::compare)
+                .orElse(-1L);
+    }
+
+    private void submitCompaction() {
+        if (compactManager.isCompactionFinished() && !toCompact.isEmpty()) {

Review Comment:
   Why not `finishCompaction(false)` first?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManager.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.data;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.store.file.compact.CompactManager;
+import org.apache.flink.table.store.file.compact.CompactTask;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+/** Compact manager for {@link 
org.apache.flink.table.store.file.AppendOnlyFileStore}. */
+public class AppendOnlyCompactManager extends CompactManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AppendOnlyCompactManager.class);
+
+    private final int minFileNum;
+    private final int maxFileNum;
+    private final long targetFileSize;
+    private final CompactRewriter rewriter;
+    private final LinkedList<DataFileMeta> toCompact;
+
+    public AppendOnlyCompactManager(
+            ExecutorService executor,
+            LinkedList<DataFileMeta> toCompact,
+            int minFileNum,
+            int maxFileNum,
+            long targetFileSize,
+            CompactRewriter rewriter) {
+        super(executor);
+        this.toCompact = toCompact;
+        this.maxFileNum = maxFileNum;
+        this.minFileNum = minFileNum;
+        this.targetFileSize = targetFileSize;
+        this.rewriter = rewriter;
+    }
+
+    @Override
+    public void submitCompaction() {
+        if (taskFuture != null) {
+            throw new IllegalStateException(
+                    "Please finish the previous compaction before submitting 
new one.");
+        }
+        pickCompactBefore()
+                .ifPresent(
+                        (before) -> {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug(
+                                        "Submit compaction with files (name, 
level, size): "
+                                                + before.stream()
+                                                        .map(
+                                                                file ->
+                                                                        
String.format(
+                                                                               
 "(%s, %d, %d)",
+                                                                               
 file.fileName(),
+                                                                               
 file.level(),
+                                                                               
 file.fileSize()))
+                                                        
.collect(Collectors.joining(", ")));
+                            }
+                            taskFuture =
+                                    executor.submit(
+                                            new CompactTask() {
+                                                @Override
+                                                protected void doCompact() 
throws Exception {
+                                                    
compactBefore.addAll(before);
+                                                    
compactAfter.addAll(rewriter.rewrite(before));
+                                                }
+                                            });
+                        });
+    }
+
+    @VisibleForTesting
+    Optional<List<DataFileMeta>> pickCompactBefore() {
+        long totalFileSize = 0L;
+        int fileNum = 0;
+        LinkedList<DataFileMeta> compactBefore = new LinkedList<>();

Review Comment:
   candidates?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManager.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.data;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.store.file.compact.CompactManager;
+import org.apache.flink.table.store.file.compact.CompactTask;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+/** Compact manager for {@link 
org.apache.flink.table.store.file.AppendOnlyFileStore}. */
+public class AppendOnlyCompactManager extends CompactManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AppendOnlyCompactManager.class);
+
+    private final int minFileNum;
+    private final int maxFileNum;
+    private final long targetFileSize;
+    private final CompactRewriter rewriter;
+    private final LinkedList<DataFileMeta> toCompact;
+
+    public AppendOnlyCompactManager(
+            ExecutorService executor,
+            LinkedList<DataFileMeta> toCompact,
+            int minFileNum,
+            int maxFileNum,
+            long targetFileSize,
+            CompactRewriter rewriter) {
+        super(executor);
+        this.toCompact = toCompact;
+        this.maxFileNum = maxFileNum;
+        this.minFileNum = minFileNum;
+        this.targetFileSize = targetFileSize;
+        this.rewriter = rewriter;
+    }
+
+    @Override
+    public void submitCompaction() {
+        if (taskFuture != null) {
+            throw new IllegalStateException(
+                    "Please finish the previous compaction before submitting 
new one.");
+        }
+        pickCompactBefore()
+                .ifPresent(
+                        (before) -> {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug(
+                                        "Submit compaction with files (name, 
level, size): "
+                                                + before.stream()
+                                                        .map(
+                                                                file ->
+                                                                        
String.format(
+                                                                               
 "(%s, %d, %d)",
+                                                                               
 file.fileName(),
+                                                                               
 file.level(),
+                                                                               
 file.fileSize()))
+                                                        
.collect(Collectors.joining(", ")));
+                            }
+                            taskFuture =
+                                    executor.submit(
+                                            new CompactTask() {
+                                                @Override
+                                                protected void doCompact() 
throws Exception {
+                                                    
compactBefore.addAll(before);
+                                                    
compactAfter.addAll(rewriter.rewrite(before));
+                                                }
+                                            });
+                        });
+    }
+
+    @VisibleForTesting
+    Optional<List<DataFileMeta>> pickCompactBefore() {
+        long totalFileSize = 0L;
+        int fileNum = 0;
+        LinkedList<DataFileMeta> compactBefore = new LinkedList<>();
+
+        while (!toCompact.isEmpty()) {
+            DataFileMeta file = toCompact.pollFirst();
+            compactBefore.add(file);
+            totalFileSize += file.fileSize();
+            fileNum++;
+            if ((totalFileSize >= targetFileSize && fileNum >= minFileNum)
+                    || fileNum >= maxFileNum) {
+                return Optional.of(compactBefore);
+            } else if (totalFileSize >= targetFileSize) {
+                // left pointer shift one pos to right
+                DataFileMeta removed = compactBefore.pollFirst();
+                assert removed != null;
+                totalFileSize -= removed.fileSize();
+                fileNum--;
+            }
+        }
+        for (DataFileMeta file : compactBefore) {

Review Comment:
   This time, `toCompact` is empty? Assert this?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java:
##########
@@ -126,31 +150,122 @@ public List<DataFileMeta> close() throws Exception {
         return result;
     }
 
-    private RowRollingWriter createRollingRowWriter() {
-        return new RowRollingWriter(
-                () -> new RowFileWriter(fileWriterFactory, 
pathFactory.newPath()), targetFileSize);
+    private static long getMaxSequenceNumber(List<DataFileMeta> fileMetas) {
+        return fileMetas.stream()
+                .map(DataFileMeta::maxSequenceNumber)
+                .max(Long::compare)
+                .orElse(-1L);
+    }
+
+    private void submitCompaction() {
+        if (compactManager.isCompactionFinished() && !toCompact.isEmpty()) {
+            compactManager.submitCompaction();
+        }
     }
 
-    private class RowRollingWriter extends RollingFileWriter<RowData, 
DataFileMeta> {
+    private void finishCompaction(boolean blocking)
+            throws ExecutionException, InterruptedException {
+        compactManager
+                .finishCompaction(blocking)
+                .ifPresent(
+                        result -> {
+                            compactBefore.addAll(result.before());
+                            compactAfter.addAll(result.after());
+                            if (!result.after().isEmpty()) {
+                                // if the last compacted file is still small,
+                                // add it back to the head
+                                DataFileMeta lastFile =
+                                        
result.after().get(result.after().size() - 1);
+                                if (lastFile.fileSize() < targetFileSize) {
+                                    toCompact.offerFirst(lastFile);
+                                }
+                            }
+                        });
+    }
+
+    private Increment drainIncrement(List<DataFileMeta> newFiles) {
+        Increment increment =
+                new Increment(
+                        newFiles, new ArrayList<>(compactBefore), new 
ArrayList<>(compactAfter));
+        compactBefore.clear();
+        compactAfter.clear();
+        // add new generated files
+        newFiles.forEach(toCompact::offerLast);
+        return increment;
+    }
+
+    @VisibleForTesting
+    List<DataFileMeta> getToCompact() {
+        return toCompact;
+    }
+
+    /** Rolling file writer for append-only table. */
+    public static class RowRollingWriter extends RollingFileWriter<RowData, 
DataFileMeta> {
 
         public RowRollingWriter(Supplier<RowFileWriter> writerFactory, long 
targetFileSize) {
             super(writerFactory, targetFileSize);
         }
+
+        public static RowRollingWriter createRollingRowWriter(
+                long schemaId,
+                FileFormat fileFormat,
+                long targetFileSize,
+                RowType writeSchema,
+                DataFilePathFactory pathFactory,
+                AtomicLong nextSeqNum) {
+            return new RowRollingWriter(
+                    () ->
+                            new RowFileWriter(
+                                    MetricFileWriter.createFactory(
+                                            
fileFormat.createWriterFactory(writeSchema),
+                                            Function.identity(),
+                                            writeSchema,
+                                            fileFormat
+                                                    
.createStatsExtractor(writeSchema)
+                                                    .orElse(null)),
+                                    pathFactory.newPath(),
+                                    writeSchema,
+                                    schemaId,
+                                    nextSeqNum),
+                    targetFileSize);
+        }
+
+        public List<DataFileMeta> write(CloseableIterator<RowData> iterator) 
throws Exception {
+            try {
+                super.write(iterator);
+                super.close();
+                return super.result();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            } finally {
+                iterator.close();
+            }
+        }
     }
 
-    private class RowFileWriter extends BaseFileWriter<RowData, DataFileMeta> {
-        private final long minSeqNum;
+    /** */
+    public static class RowFileWriter extends BaseFileWriter<RowData, 
DataFileMeta> {
 
-        public RowFileWriter(FileWriter.Factory<RowData, Metric> 
writerFactory, Path path) {
+        private final FieldStatsArraySerializer statsArraySerializer;
+        private final long schemaId;
+        private final AtomicLong minSeqNum;
+
+        public RowFileWriter(
+                FileWriter.Factory<RowData, Metric> writerFactory,
+                Path path,
+                RowType writeSchema,
+                long schemaId,
+                AtomicLong minSeqNum) {

Review Comment:
   Maybe it is not a `minSeqNum`, should be renamed to `seqNumCounter`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to