lirui-apache commented on a change in pull request #13852: URL: https://github.com/apache/flink/pull/13852#discussion_r515763622
########## File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FileCompactionITCaseBase.java ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.stream.sql; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.scala.DataStream; +import org.apache.flink.table.planner.runtime.utils.ParallelFiniteTestSource; +import org.apache.flink.table.planner.runtime.utils.StreamingTestBase; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.FileUtils; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Streaming sink File Compaction ITCase base, test checkpoint. + */ +public abstract class FileCompactionITCaseBase extends StreamingTestBase { + + @Rule + public Timeout timeoutPerTest = Timeout.seconds(60); + + private String resultPath; + + private List<Row> rows; + + @Before + public void init() throws IOException { + resultPath = tempFolder().newFolder().toURI().toString(); + clear(); Review comment: Why do we need to call clear here? ########## File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FileCompactionITCaseBase.java ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.stream.sql; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.scala.DataStream; +import org.apache.flink.table.planner.runtime.utils.ParallelFiniteTestSource; +import org.apache.flink.table.planner.runtime.utils.StreamingTestBase; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.FileUtils; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Streaming sink File Compaction ITCase base, test checkpoint. + */ +public abstract class FileCompactionITCaseBase extends StreamingTestBase { + + @Rule + public Timeout timeoutPerTest = Timeout.seconds(60); + + private String resultPath; + + private List<Row> rows; + + @Before + public void init() throws IOException { + resultPath = tempFolder().newFolder().toURI().toString(); + clear(); + + env().setParallelism(3); + env().enableCheckpointing(100); + + rows = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + rows.add(Row.of(i, String.valueOf(i % 10), String.valueOf(i))); + } + + DataStream<Row> stream = new DataStream<>(env().getJavaEnv().addSource( + new ParallelFiniteTestSource<>(rows), + new RowTypeInfo( + new TypeInformation[] {Types.INT, Types.STRING, Types.STRING}, + new String[] {"a", "b", "c"}))); + + tEnv().createTemporaryView("my_table", stream); + } + + @After + public void clear() throws IOException { + FileUtils.deleteDirectory(new File(URI.create(resultPath))); + } + + protected abstract String format(); + + @Test + public void testNonPartition() throws Exception { + tEnv().executeSql("CREATE TABLE sink_table (a int, b string, c string) with (" + options() + ")"); + tEnv().executeSql("insert into sink_table select * from my_table").await(); + + List<Row> results = toListAndClose(tEnv().executeSql("select * from sink_table").collect()); + results.sort(Comparator.comparingInt(o -> (Integer) o.getField(0))); + assertEquals(rows, results); + + File[] files = new File(URI.create(resultPath)).listFiles( + (dir, name) -> name.startsWith("compacted-part-")); Review comment: Can we reuse the `COMPACTED_PREFIX` defined in `CompactOperator`? ########## File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FileCompactionITCaseBase.java ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.stream.sql; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.scala.DataStream; +import org.apache.flink.table.planner.runtime.utils.ParallelFiniteTestSource; +import org.apache.flink.table.planner.runtime.utils.StreamingTestBase; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.FileUtils; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Streaming sink File Compaction ITCase base, test checkpoint. + */ +public abstract class FileCompactionITCaseBase extends StreamingTestBase { + + @Rule + public Timeout timeoutPerTest = Timeout.seconds(60); + + private String resultPath; + + private List<Row> rows; + + @Before + public void init() throws IOException { + resultPath = tempFolder().newFolder().toURI().toString(); + clear(); + + env().setParallelism(3); + env().enableCheckpointing(100); + + rows = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + rows.add(Row.of(i, String.valueOf(i % 10), String.valueOf(i))); + } + + DataStream<Row> stream = new DataStream<>(env().getJavaEnv().addSource( + new ParallelFiniteTestSource<>(rows), + new RowTypeInfo( + new TypeInformation[] {Types.INT, Types.STRING, Types.STRING}, + new String[] {"a", "b", "c"}))); + + tEnv().createTemporaryView("my_table", stream); + } + + @After + public void clear() throws IOException { + FileUtils.deleteDirectory(new File(URI.create(resultPath))); + } + + protected abstract String format(); + + @Test + public void testNonPartition() throws Exception { + tEnv().executeSql("CREATE TABLE sink_table (a int, b string, c string) with (" + options() + ")"); + tEnv().executeSql("insert into sink_table select * from my_table").await(); + + List<Row> results = toListAndClose(tEnv().executeSql("select * from sink_table").collect()); + results.sort(Comparator.comparingInt(o -> (Integer) o.getField(0))); + assertEquals(rows, results); + + File[] files = new File(URI.create(resultPath)).listFiles( + (dir, name) -> name.startsWith("compacted-part-")); + assertEquals(Arrays.toString(files), 1, files.length); + + String fileName = files[0].getName(); + assertTrue(fileName, fileName.startsWith("compacted-part-")); Review comment: Seems this is redundant because we list files with a filter that only returns files with this prefix ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java ########## @@ -161,20 +190,120 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context sinkContext) { .withOutputFileConfig(outputFileConfig) .withRollingPolicy(rollingPolicy); } - return createStreamingSink( - tableOptions, + + long bucketCheckInterval = tableOptions.get(SINK_ROLLING_POLICY_CHECK_INTERVAL).toMillis(); + + DataStream<PartitionCommitInfo> writerStream; + if (autoCompaction) { + long compactionSize = tableOptions + .getOptional(FileSystemOptions.COMPACTION_FILE_SIZE) + .orElse(tableOptions.get(SINK_ROLLING_POLICY_FILE_SIZE)) + .getBytes(); + + CompactReader.Factory<RowData> reader = createCompactReader(sinkContext).orElseThrow( + () -> new TableException("Please implement available reader for compaction:" + + " BulkFormat, FileInputFormat.")); + + writerStream = StreamingSink.compactionWriter( + dataStream, + bucketCheckInterval, + bucketsBuilder, + fsFactory, + path, + reader, + compactionSize); + } else { + writerStream = StreamingSink.writer( + dataStream, bucketCheckInterval, bucketsBuilder); + } + + return StreamingSink.sink( + writerStream, path, - partitionKeys, tableIdentifier, - overwrite, - dataStream, - bucketsBuilder, + partitionKeys, metaStoreFactory, fsFactory, - tableOptions.get(SINK_ROLLING_POLICY_CHECK_INTERVAL).toMillis()); + tableOptions); + } + } + + private Optional<CompactReader.Factory<RowData>> createCompactReader(Context context) { Review comment: `createCompactReaderFactory`? ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java ########## @@ -138,15 +154,28 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context sinkContext) { return dataStream.writeUsingOutputFormat(builder.build()) .setParallelism(dataStream.getParallelism()); } else { + if (overwrite) { + throw new IllegalStateException("Streaming mode not support overwrite."); + } + + boolean autoCompaction = tableOptions.getBoolean(FileSystemOptions.AUTO_COMPACTION); Object writer = createWriter(sinkContext); + boolean isEncoder = writer instanceof Encoder; TableBucketAssigner assigner = new TableBucketAssigner(computer); TableRollingPolicy rollingPolicy = new TableRollingPolicy( - !(writer instanceof Encoder), + !isEncoder || autoCompaction, tableOptions.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(), tableOptions.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis()); + if (autoCompaction) { + outputFileConfig = OutputFileConfig.builder() Review comment: Can we have a method to create builder from an `OutputFileConfig` instance? To make sure we won't lose anything here. ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/compact/FileInputFormatReader.java ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.filesystem.stream.compact; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.core.fs.FileInputSplit; + +import java.io.IOException; + +/** + * The {@link CompactReader} to delegate {@link FileInputFormat}. + */ +public class FileInputFormatReader<T> implements CompactReader<T> { Review comment: FileInputFormatCompactReader? ########## File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FileCompactionITCaseBase.java ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.stream.sql; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.scala.DataStream; +import org.apache.flink.table.planner.runtime.utils.ParallelFiniteTestSource; +import org.apache.flink.table.planner.runtime.utils.StreamingTestBase; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.FileUtils; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Streaming sink File Compaction ITCase base, test checkpoint. + */ +public abstract class FileCompactionITCaseBase extends StreamingTestBase { + + @Rule + public Timeout timeoutPerTest = Timeout.seconds(60); + + private String resultPath; + + private List<Row> rows; + + @Before + public void init() throws IOException { + resultPath = tempFolder().newFolder().toURI().toString(); + clear(); + + env().setParallelism(3); + env().enableCheckpointing(100); + + rows = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + rows.add(Row.of(i, String.valueOf(i % 10), String.valueOf(i))); + } + + DataStream<Row> stream = new DataStream<>(env().getJavaEnv().addSource( + new ParallelFiniteTestSource<>(rows), + new RowTypeInfo( + new TypeInformation[] {Types.INT, Types.STRING, Types.STRING}, + new String[] {"a", "b", "c"}))); + + tEnv().createTemporaryView("my_table", stream); + } + + @After + public void clear() throws IOException { + FileUtils.deleteDirectory(new File(URI.create(resultPath))); + } + + protected abstract String format(); + + @Test + public void testNonPartition() throws Exception { + tEnv().executeSql("CREATE TABLE sink_table (a int, b string, c string) with (" + options() + ")"); + tEnv().executeSql("insert into sink_table select * from my_table").await(); + + List<Row> results = toListAndClose(tEnv().executeSql("select * from sink_table").collect()); + results.sort(Comparator.comparingInt(o -> (Integer) o.getField(0))); + assertEquals(rows, results); + + File[] files = new File(URI.create(resultPath)).listFiles( + (dir, name) -> name.startsWith("compacted-part-")); + assertEquals(Arrays.toString(files), 1, files.length); Review comment: Can we also verify there's no un-compacted files left? ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java ########## @@ -185,4 +185,17 @@ .defaultValue("_SUCCESS") .withDescription("The file name for success-file partition commit policy," + " default is '_SUCCESS'."); + + public static final ConfigOption<Boolean> AUTO_COMPACTION = + key("auto-compaction") + .booleanType() + .defaultValue(false) + .withDescription("Whether to enable automatic compaction in streaming sink or not." + Review comment: Let's have some more high-level explanations here, e.g. what will be compacted, when the compaction happens, whether files are usable before compaction, etc. ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java ########## @@ -127,108 +127,118 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context sinkContext) { } private DataStreamSink<?> consume(DataStream<RowData> dataStream, Context sinkContext) { - RowDataPartitionComputer computer = new RowDataPartitionComputer( + if (sinkContext.isBounded()) { + return createBatchSink(dataStream, sinkContext); + } else { + if (overwrite) { + throw new IllegalStateException("Streaming mode not support overwrite."); + } + + return createStreamingSink(dataStream, sinkContext); + } + } + + private RowDataPartitionComputer partitionComputer() { + return new RowDataPartitionComputer( defaultPartName, schema.getFieldNames(), schema.getFieldDataTypes(), partitionKeys.toArray(new String[0])); + } - EmptyMetaStoreFactory metaStoreFactory = new EmptyMetaStoreFactory(path); - OutputFileConfig outputFileConfig = OutputFileConfig.builder() + private DataStreamSink<RowData> createBatchSink( + DataStream<RowData> inputStream, Context sinkContext) { + FileSystemOutputFormat.Builder<RowData> builder = new FileSystemOutputFormat.Builder<>(); + builder.setPartitionComputer(partitionComputer()); + builder.setDynamicGrouped(dynamicGrouping); + builder.setPartitionColumns(partitionKeys.toArray(new String[0])); + builder.setFormatFactory(createOutputFormatFactory(sinkContext)); + builder.setMetaStoreFactory(new EmptyMetaStoreFactory(path)); + builder.setOverwrite(overwrite); + builder.setStaticPartitions(staticPartitions); + builder.setTempPath(toStagingPath()); + builder.setOutputFileConfig(OutputFileConfig.builder() .withPartPrefix("part-" + UUID.randomUUID().toString()) - .build(); + .build()); + return inputStream.writeUsingOutputFormat(builder.build()) + .setParallelism(inputStream.getParallelism()); + } + + private DataStreamSink<?> createStreamingSink( + DataStream<RowData> dataStream, Context sinkContext) { FileSystemFactory fsFactory = FileSystem::get; + RowDataPartitionComputer computer = partitionComputer(); - if (sinkContext.isBounded()) { - FileSystemOutputFormat.Builder<RowData> builder = new FileSystemOutputFormat.Builder<>(); - builder.setPartitionComputer(computer); - builder.setDynamicGrouped(dynamicGrouping); - builder.setPartitionColumns(partitionKeys.toArray(new String[0])); - builder.setFormatFactory(createOutputFormatFactory(sinkContext)); - builder.setMetaStoreFactory(metaStoreFactory); - builder.setFileSystemFactory(fsFactory); - builder.setOverwrite(overwrite); - builder.setStaticPartitions(staticPartitions); - builder.setTempPath(toStagingPath()); - builder.setOutputFileConfig(outputFileConfig); - return dataStream.writeUsingOutputFormat(builder.build()) - .setParallelism(dataStream.getParallelism()); + boolean autoCompaction = tableOptions.getBoolean(FileSystemOptions.AUTO_COMPACTION); + Object writer = createWriter(sinkContext); + boolean isEncoder = writer instanceof Encoder; + TableBucketAssigner assigner = new TableBucketAssigner(computer); + TableRollingPolicy rollingPolicy = new TableRollingPolicy( + !isEncoder || autoCompaction, + tableOptions.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(), + tableOptions.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis()); + + String randomPrefix = "part-" + UUID.randomUUID().toString(); + OutputFileConfig.OutputFileConfigBuilder fileNamingBuilder = OutputFileConfig.builder(); + fileNamingBuilder = autoCompaction ? + fileNamingBuilder.withPartPrefix(convertToUncompacted(randomPrefix)) : + fileNamingBuilder.withPartPrefix(randomPrefix); Review comment: Why do we call `withPartPrefix` twice? ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java ########## @@ -127,108 +127,118 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context sinkContext) { } private DataStreamSink<?> consume(DataStream<RowData> dataStream, Context sinkContext) { - RowDataPartitionComputer computer = new RowDataPartitionComputer( + if (sinkContext.isBounded()) { + return createBatchSink(dataStream, sinkContext); + } else { + if (overwrite) { + throw new IllegalStateException("Streaming mode not support overwrite."); + } + + return createStreamingSink(dataStream, sinkContext); + } + } + + private RowDataPartitionComputer partitionComputer() { + return new RowDataPartitionComputer( defaultPartName, schema.getFieldNames(), schema.getFieldDataTypes(), partitionKeys.toArray(new String[0])); + } - EmptyMetaStoreFactory metaStoreFactory = new EmptyMetaStoreFactory(path); - OutputFileConfig outputFileConfig = OutputFileConfig.builder() + private DataStreamSink<RowData> createBatchSink( + DataStream<RowData> inputStream, Context sinkContext) { + FileSystemOutputFormat.Builder<RowData> builder = new FileSystemOutputFormat.Builder<>(); + builder.setPartitionComputer(partitionComputer()); + builder.setDynamicGrouped(dynamicGrouping); + builder.setPartitionColumns(partitionKeys.toArray(new String[0])); + builder.setFormatFactory(createOutputFormatFactory(sinkContext)); + builder.setMetaStoreFactory(new EmptyMetaStoreFactory(path)); + builder.setOverwrite(overwrite); + builder.setStaticPartitions(staticPartitions); + builder.setTempPath(toStagingPath()); + builder.setOutputFileConfig(OutputFileConfig.builder() .withPartPrefix("part-" + UUID.randomUUID().toString()) - .build(); + .build()); + return inputStream.writeUsingOutputFormat(builder.build()) + .setParallelism(inputStream.getParallelism()); + } + + private DataStreamSink<?> createStreamingSink( + DataStream<RowData> dataStream, Context sinkContext) { FileSystemFactory fsFactory = FileSystem::get; + RowDataPartitionComputer computer = partitionComputer(); - if (sinkContext.isBounded()) { - FileSystemOutputFormat.Builder<RowData> builder = new FileSystemOutputFormat.Builder<>(); - builder.setPartitionComputer(computer); - builder.setDynamicGrouped(dynamicGrouping); - builder.setPartitionColumns(partitionKeys.toArray(new String[0])); - builder.setFormatFactory(createOutputFormatFactory(sinkContext)); - builder.setMetaStoreFactory(metaStoreFactory); - builder.setFileSystemFactory(fsFactory); - builder.setOverwrite(overwrite); - builder.setStaticPartitions(staticPartitions); - builder.setTempPath(toStagingPath()); - builder.setOutputFileConfig(outputFileConfig); - return dataStream.writeUsingOutputFormat(builder.build()) - .setParallelism(dataStream.getParallelism()); + boolean autoCompaction = tableOptions.getBoolean(FileSystemOptions.AUTO_COMPACTION); + Object writer = createWriter(sinkContext); + boolean isEncoder = writer instanceof Encoder; + TableBucketAssigner assigner = new TableBucketAssigner(computer); + TableRollingPolicy rollingPolicy = new TableRollingPolicy( + !isEncoder || autoCompaction, + tableOptions.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(), + tableOptions.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis()); + + String randomPrefix = "part-" + UUID.randomUUID().toString(); + OutputFileConfig.OutputFileConfigBuilder fileNamingBuilder = OutputFileConfig.builder(); + fileNamingBuilder = autoCompaction ? + fileNamingBuilder.withPartPrefix(convertToUncompacted(randomPrefix)) : + fileNamingBuilder.withPartPrefix(randomPrefix); Review comment: Never mind... Just noted it's for different cases ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
