JingsongLi commented on code in PR #182:
URL: https://github.com/apache/flink-table-store/pull/182#discussion_r915444076
##########
docs/layouts/shortcodes/generated/core_configuration.html:
##########
@@ -36,13 +36,25 @@
<td><h5>compaction.max-size-amplification-percent</h5></td>
<td style="word-wrap: break-word;">200</td>
<td>Integer</td>
- <td>The size amplification is defined as the amount (in
percentage) of additional storage needed to store a single byte of data in the
merge tree.</td>
+ <td>The size amplification is defined as the amount (in
percentage) of additional storage needed to store a single byte of data in the
merge tree for changelog mode table.</td>
+ </tr>
+ <tr>
+ <td><h5>compaction.max.file-num</h5></td>
Review Comment:
maybe change a word?
If 100 small files are given directly to compaction, they will also be done
together, so that it is not understood as a typical max.
Maybe `compaction.early-max.file-num`?
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java:
##########
@@ -214,17 +214,36 @@ public class CoreOptions implements Serializable {
.defaultValue(200)
.withDescription(
"The size amplification is defined as the amount
(in percentage) of additional storage "
- + "needed to store a single byte of data
in the merge tree.");
+ + "needed to store a single byte of data
in the merge tree for changelog mode table.");
public static final ConfigOption<Integer> COMPACTION_SIZE_RATIO =
ConfigOptions.key("compaction.size-ratio")
.intType()
.defaultValue(1)
.withDescription(
- "Percentage flexibility while comparing sorted run
size. If the candidate sorted run(s) "
+ "Percentage flexibility while comparing sorted run
size for changelog mode table. If the candidate sorted run(s) "
+ "size is 1% smaller than the next sorted
run's size, then include next sorted run "
+ "into this candidate set.");
+ public static final ConfigOption<Integer> COMPACTION_MIN_FILE_NUM =
+ ConfigOptions.key("compaction.min.file-num")
+ .intType()
+ .defaultValue(4)
Review Comment:
5?
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactTask.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.compact;
+
+import org.apache.flink.table.store.file.data.DataFileMeta;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+
+/** Compact task. */
+public abstract class CompactTask implements Callable<CompactResult> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CompactTask.class);
+
+ // metrics
+ protected long rewriteInputSize;
+ protected long rewriteOutputSize;
+ protected int rewriteFilesNum;
+
+ public CompactTask() {
+ this.rewriteInputSize = 0;
Review Comment:
We don't need this constructor.
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactTask.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.compact;
+
+import org.apache.flink.table.store.file.data.DataFileMeta;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+
+/** Compact task. */
+public abstract class CompactTask implements Callable<CompactResult> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CompactTask.class);
+
+ // metrics
+ protected long rewriteInputSize;
+ protected long rewriteOutputSize;
+ protected int rewriteFilesNum;
+
+ public CompactTask() {
Review Comment:
Pass compact before here?
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManager.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.data;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.store.file.compact.CompactManager;
+import org.apache.flink.table.store.file.compact.CompactResult;
+import org.apache.flink.table.store.file.compact.CompactTask;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+
+/** Compact manager for {@link
org.apache.flink.table.store.file.AppendOnlyFileStore}. */
+public class AppendOnlyCompactManager extends CompactManager {
+
+ private final int minFileNum;
+ private final int maxFileNum;
+ private final long targetFileSize;
+ private final CompactRewriter rewriter;
+ private final LinkedList<DataFileMeta> toCompact;
+
+ public AppendOnlyCompactManager(
+ ExecutorService executor,
+ LinkedList<DataFileMeta> toCompact,
+ int minFileNum,
+ int maxFileNum,
+ long targetFileSize,
+ CompactRewriter rewriter) {
+ super(executor);
+ this.toCompact = toCompact;
+ this.maxFileNum = maxFileNum;
+ this.minFileNum = minFileNum;
+ this.targetFileSize = targetFileSize;
+ this.rewriter = rewriter;
+ }
+
+ @Override
+ public void submitCompaction() {
+ if (taskFuture != null) {
+ throw new IllegalStateException(
+ "Please finish the previous compaction before submitting
new one.");
+ }
+ pickCompactBefore()
+ .ifPresent(
+ (compactBefore) ->
+ taskFuture =
+ executor.submit(
+ new CompactTask() {
+ @Override
+ protected CompactResult
compact()
+ throws Exception {
+
collectBeforeStats(compactBefore);
+ List<DataFileMeta>
compactAfter =
+
rewriter.rewrite(compactBefore);
+
collectAfterStats(compactAfter);
+ return
result(compactBefore, compactAfter);
+ }
+ }));
+ }
+
+ @VisibleForTesting
+ Optional<List<DataFileMeta>> pickCompactBefore() {
+ long totalFileSize = 0L;
+ int fileNum = 0;
+ int releaseCtr = 0;
+ for (int i = 0; i < toCompact.size(); i++) {
+ DataFileMeta file = toCompact.get(i);
+ totalFileSize += file.fileSize();
+ fileNum++;
+ int pos = i - fileNum + 1;
+ if ((totalFileSize >= targetFileSize && fileNum >= minFileNum)
+ || fileNum >= maxFileNum) {
+ // trigger compaction for [pos, i]
+ List<DataFileMeta> compactBefore = new
ArrayList<>(toCompact.subList(pos, i + 1));
Review Comment:
List<DataFileMeta> compactBefore = new ArrayList<>();
for (int j = 0; j <= pos - 1; j++) {
compactBefore.add(toCompact.pollFirst());
}
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyRollingFileWriter.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.data;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.stats.BinaryTableStats;
+import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
+import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.store.file.writer.BaseFileWriter;
+import org.apache.flink.table.store.file.writer.FileWriter;
+import org.apache.flink.table.store.file.writer.Metric;
+import org.apache.flink.table.store.file.writer.MetricFileWriter;
+import org.apache.flink.table.store.file.writer.RollingFileWriter;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.CloseableIterator;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/** Rolling file writer for append-only table. */
+public class AppendOnlyRollingFileWriter {
Review Comment:
just a static method in `AppendOnlyWriter`?
```
static RowRollingWriter createRollingRowWriter(.....);
```
##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableTest.java:
##########
@@ -20,6 +20,7 @@
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
Review Comment:
Rename this to `AppendOnlyTableITCase`
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactTask.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.compact;
+
+import org.apache.flink.table.store.file.data.DataFileMeta;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+
+/** Compact task. */
+public abstract class CompactTask implements Callable<CompactResult> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CompactTask.class);
+
+ // metrics
+ protected long rewriteInputSize;
+ protected long rewriteOutputSize;
+ protected int rewriteFilesNum;
+
+ public CompactTask() {
+ this.rewriteInputSize = 0;
+ this.rewriteOutputSize = 0;
+ this.rewriteFilesNum = 0;
+ }
+
+ @Override
+ public CompactResult call() throws Exception {
+ long startMillis = System.currentTimeMillis();
+ CompactResult result = compact();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(logMetric(startMillis, result));
+ }
+ return result;
+ }
+
+ protected String logMetric(long startMillis, CompactResult result) {
+ return String.format(
+ "Done compacting %d files to %d files in %dms. "
+ + "Rewrite input size = %d, output size = %d, rewrite
file num = %d",
+ result.before().size(),
+ result.after().size(),
+ System.currentTimeMillis() - startMillis,
+ rewriteInputSize,
+ rewriteOutputSize,
+ rewriteFilesNum);
+ }
+
+ protected abstract CompactResult compact() throws Exception;
+
+ protected CompactResult result(List<DataFileMeta> before,
List<DataFileMeta> after) {
Review Comment:
`result`, `collectBeforeStats`, `collectAfterStats` don't need to be expose
to sub classes.
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java:
##########
@@ -82,9 +100,48 @@ public Callable<CompactResult> createCompactWriter(
}
private RecordWriter<RowData> createWriter(
- BinaryRowData partition, int bucket, long maxSeqNum) {
+ BinaryRowData partition,
+ int bucket,
+ List<DataFileMeta> restoredFiles,
+ ExecutorService compactExecutor) {
+ // let writer and compact manager hold the same reference
+ // and make restore files mutable to update
+ LinkedList<DataFileMeta> toCompact = new LinkedList<>(restoredFiles);
Review Comment:
Maybe we don't need to restore files.
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java:
##########
@@ -19,66 +19,59 @@
package org.apache.flink.table.store.file.data;
-import org.apache.flink.core.fs.Path;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.mergetree.Increment;
-import org.apache.flink.table.store.file.stats.BinaryTableStats;
-import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
-import org.apache.flink.table.store.file.utils.FileUtils;
-import org.apache.flink.table.store.file.writer.BaseFileWriter;
-import org.apache.flink.table.store.file.writer.FileWriter;
-import org.apache.flink.table.store.file.writer.Metric;
-import org.apache.flink.table.store.file.writer.MetricFileWriter;
import org.apache.flink.table.store.file.writer.RecordWriter;
-import org.apache.flink.table.store.file.writer.RollingFileWriter;
import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
-import java.io.IOException;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
-import java.util.function.Function;
-import java.util.function.Supplier;
+import java.util.concurrent.ExecutionException;
/**
* A {@link RecordWriter} implementation that only accepts records which are
always insert
* operations and don't have any unique keys or sort keys.
*/
public class AppendOnlyWriter implements RecordWriter<RowData> {
+
private final long schemaId;
+ private final FileFormat fileFormat;
private final long targetFileSize;
+ private final RowType writeSchema;
private final DataFilePathFactory pathFactory;
- private final FieldStatsArraySerializer statsArraySerializer;
-
- private final FileWriter.Factory<RowData, Metric> fileWriterFactory;
- private long nextSeqNum;
+ private final AppendOnlyCompactManager compactManager;
+ private final boolean forceCompact;
+ private final LinkedList<DataFileMeta> toCompact;
+ private final List<DataFileMeta> compactBefore;
+ private final List<DataFileMeta> compactAfter;
- private RowRollingWriter writer;
+ private AppendOnlyRollingFileWriter writer;
public AppendOnlyWriter(
long schemaId,
FileFormat fileFormat,
long targetFileSize,
RowType writeSchema,
- long maxWroteSeqNumber,
+ LinkedList<DataFileMeta> toCompact,
+ AppendOnlyCompactManager compactManager,
+ boolean forceCompact,
DataFilePathFactory pathFactory) {
this.schemaId = schemaId;
+ this.fileFormat = fileFormat;
this.targetFileSize = targetFileSize;
+ this.writeSchema = writeSchema;
this.pathFactory = pathFactory;
- this.statsArraySerializer = new FieldStatsArraySerializer(writeSchema);
-
- // Initialize the file writer factory to write records and generic
metric.
- this.fileWriterFactory =
- MetricFileWriter.createFactory(
- fileFormat.createWriterFactory(writeSchema),
- Function.identity(),
- writeSchema,
-
fileFormat.createStatsExtractor(writeSchema).orElse(null));
-
- this.nextSeqNum = maxWroteSeqNumber + 1;
- this.writer = createRollingRowWriter();
+ this.compactManager = compactManager;
+ this.forceCompact = forceCompact;
+ this.toCompact = toCompact;
+ this.compactBefore = new ArrayList<>();
+ this.compactAfter = new ArrayList<>();
+ this.writer = createRollingFileWriter(getMaxSequenceNumber(new
ArrayList<>(toCompact)) + 1);
Review Comment:
why copy `toCompact`?
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManager.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.data;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.store.file.compact.CompactManager;
+import org.apache.flink.table.store.file.compact.CompactResult;
+import org.apache.flink.table.store.file.compact.CompactTask;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+
+/** Compact manager for {@link
org.apache.flink.table.store.file.AppendOnlyFileStore}. */
+public class AppendOnlyCompactManager extends CompactManager {
+
+ private final int minFileNum;
+ private final int maxFileNum;
+ private final long targetFileSize;
+ private final CompactRewriter rewriter;
+ private final LinkedList<DataFileMeta> toCompact;
+
+ public AppendOnlyCompactManager(
+ ExecutorService executor,
+ LinkedList<DataFileMeta> toCompact,
+ int minFileNum,
+ int maxFileNum,
+ long targetFileSize,
+ CompactRewriter rewriter) {
+ super(executor);
+ this.toCompact = toCompact;
+ this.maxFileNum = maxFileNum;
+ this.minFileNum = minFileNum;
+ this.targetFileSize = targetFileSize;
+ this.rewriter = rewriter;
+ }
+
+ @Override
+ public void submitCompaction() {
+ if (taskFuture != null) {
+ throw new IllegalStateException(
+ "Please finish the previous compaction before submitting
new one.");
+ }
+ pickCompactBefore()
+ .ifPresent(
+ (compactBefore) ->
+ taskFuture =
+ executor.submit(
+ new CompactTask() {
+ @Override
+ protected CompactResult
compact()
+ throws Exception {
+
collectBeforeStats(compactBefore);
+ List<DataFileMeta>
compactAfter =
+
rewriter.rewrite(compactBefore);
+
collectAfterStats(compactAfter);
+ return
result(compactBefore, compactAfter);
+ }
+ }));
+ }
+
+ @VisibleForTesting
+ Optional<List<DataFileMeta>> pickCompactBefore() {
+ long totalFileSize = 0L;
+ int fileNum = 0;
+ int releaseCtr = 0;
+ for (int i = 0; i < toCompact.size(); i++) {
+ DataFileMeta file = toCompact.get(i);
+ totalFileSize += file.fileSize();
+ fileNum++;
+ int pos = i - fileNum + 1;
+ if ((totalFileSize >= targetFileSize && fileNum >= minFileNum)
+ || fileNum >= maxFileNum) {
+ // trigger compaction for [pos, i]
+ List<DataFileMeta> compactBefore = new
ArrayList<>(toCompact.subList(pos, i + 1));
+ // files in [0, pos - 1] can be released immediately
+ // [pos, i] should be released after compaction finished
+ for (int j = 0; j <= pos - 1; j++) {
+ toCompact.pollFirst();
+ }
+ return Optional.of(compactBefore);
+ } else if (totalFileSize >= targetFileSize) {
+ // this is equivalent to shift one pos to right
+ fileNum--;
Review Comment:
Why not poll first directly?
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactTask.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.compact;
+
+import org.apache.flink.table.store.file.data.DataFileMeta;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+
+/** Compact task. */
+public abstract class CompactTask implements Callable<CompactResult> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CompactTask.class);
+
+ // metrics
+ protected long rewriteInputSize;
+ protected long rewriteOutputSize;
+ protected int rewriteFilesNum;
+
+ public CompactTask() {
+ this.rewriteInputSize = 0;
+ this.rewriteOutputSize = 0;
+ this.rewriteFilesNum = 0;
+ }
+
+ @Override
+ public CompactResult call() throws Exception {
+ long startMillis = System.currentTimeMillis();
+ CompactResult result = compact();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(logMetric(startMillis, result));
+ }
+ return result;
+ }
+
+ protected String logMetric(long startMillis, CompactResult result) {
+ return String.format(
+ "Done compacting %d files to %d files in %dms. "
+ + "Rewrite input size = %d, output size = %d, rewrite
file num = %d",
+ result.before().size(),
+ result.after().size(),
+ System.currentTimeMillis() - startMillis,
+ rewriteInputSize,
+ rewriteOutputSize,
+ rewriteFilesNum);
+ }
+
+ protected abstract CompactResult compact() throws Exception;
Review Comment:
`List<DataFileMeta> compact(List<DataFileMeta> inputs)`?
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java:
##########
@@ -88,6 +81,9 @@ public void write(RowData rowData) throws Exception {
"Append-only writer can only accept insert row kind, but
current row kind is: %s",
rowData.getRowKind());
writer.write(rowData);
+ if (!toCompact.isEmpty()) {
+ submitCompaction();
Review Comment:
submit compaction per record?
I think should be in rolling file or prepareCommit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]