lsyldliu commented on code in PR #21703:
URL: https://github.com/apache/flink/pull/21703#discussion_r1080776842


##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/BatchSink.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.batch;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.file.table.FileSystemFactory;
+import org.apache.flink.connector.file.table.FileSystemOutputFormat;
+import org.apache.flink.connector.file.table.PartitionCommitPolicyFactory;
+import org.apache.flink.connector.file.table.TableMetaStoreFactory;
+import 
org.apache.flink.connector.file.table.batch.compact.BatchCompactCoordinator;
+import 
org.apache.flink.connector.file.table.batch.compact.BatchCompactOperator;
+import 
org.apache.flink.connector.file.table.batch.compact.BatchPartitionCommitterSink;
+import 
org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages;
+import 
org.apache.flink.connector.file.table.stream.compact.CompactMessages.CoordinatorInput;
+import org.apache.flink.connector.file.table.stream.compact.CompactReader;
+import org.apache.flink.connector.file.table.stream.compact.CompactWriter;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.function.SupplierWithException;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.LinkedHashMap;
+
+/** Helper for creating batch file sink. */
+@Internal
+public class BatchSink {
+    private BatchSink() {}
+
+    public static DataStreamSink<Row> createBatchNoAutoCompactSink(
+            DataStream<RowData> dataStream,
+            DynamicTableSink.DataStructureConverter converter,
+            FileSystemOutputFormat<Row> fileSystemOutputFormat,
+            final int parallelism)
+            throws IOException {
+        return dataStream
+                .map((MapFunction<RowData, Row>) value -> (Row) 
converter.toExternal(value))
+                .setParallelism(parallelism)
+                .writeUsingOutputFormat(fileSystemOutputFormat)
+                .setParallelism(parallelism);
+    }
+
+    public static <T> DataStreamSink<?> createBatchCompactSink(
+            DataStream<CoordinatorInput> dataStream,
+            StreamingFileSink.BucketsBuilder<
+                            T, String, ? extends 
StreamingFileSink.BucketsBuilder<T, String, ?>>
+                    builder,
+            CompactReader.Factory<T> readFactory,
+            FileSystemFactory fsFactory,
+            TableMetaStoreFactory metaStoreFactory,
+            PartitionCommitPolicyFactory partitionCommitPolicyFactory,
+            String[] partitionColumns,
+            LinkedHashMap<String, String> staticPartitionSpec,
+            Path tmpPath,
+            ObjectIdentifier identifier,
+            final long compactAverageSize,
+            final long compactTargetSize,
+            boolean isToLocal,
+            boolean overwrite,
+            final int compactParallelism)
+            throws IOException {

Review Comment:
   Ditto



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/BatchSink.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.batch;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.file.table.FileSystemFactory;
+import org.apache.flink.connector.file.table.FileSystemOutputFormat;
+import org.apache.flink.connector.file.table.PartitionCommitPolicyFactory;
+import org.apache.flink.connector.file.table.TableMetaStoreFactory;
+import 
org.apache.flink.connector.file.table.batch.compact.BatchCompactCoordinator;
+import 
org.apache.flink.connector.file.table.batch.compact.BatchCompactOperator;
+import 
org.apache.flink.connector.file.table.batch.compact.BatchPartitionCommitterSink;
+import 
org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages;
+import 
org.apache.flink.connector.file.table.stream.compact.CompactMessages.CoordinatorInput;
+import org.apache.flink.connector.file.table.stream.compact.CompactReader;
+import org.apache.flink.connector.file.table.stream.compact.CompactWriter;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.function.SupplierWithException;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.LinkedHashMap;
+
+/** Helper for creating batch file sink. */
+@Internal
+public class BatchSink {
+    private BatchSink() {}
+
+    public static DataStreamSink<Row> createBatchNoAutoCompactSink(

Review Comment:
   I think `createBatchSink` is enough.



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java:
##########
@@ -351,15 +365,211 @@ private DataStreamSink<?> consume(
         }
     }
 
-    private DataStreamSink<Row> createBatchSink(
+    private DataStreamSink<?> createBatchSink(
+            DataStream<RowData> dataStream,
+            DataStructureConverter converter,
+            HiveWriterFactory recordWriterFactory,
+            TableMetaStoreFactory metaStoreFactory,
+            OutputFileConfig.OutputFileConfigBuilder fileConfigBuilder,
+            String stagingParentDir,
+            StorageDescriptor sd,
+            Properties tableProps,
+            boolean isToLocal,
+            boolean overwrite,
+            int sinkParallelism)
+            throws IOException {
+        org.apache.flink.configuration.Configuration conf =
+                new org.apache.flink.configuration.Configuration();
+        catalogTable.getOptions().forEach(conf::setString);
+        boolean autoCompaction = 
conf.getBoolean(FileSystemConnectorOptions.AUTO_COMPACTION);
+        if (autoCompaction) {
+            if (batchShuffleMode != BatchShuffleMode.ALL_EXCHANGES_BLOCKING) {
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Auto compaction for Hive sink in batch mode 
is not supported when the %s is not %s.",
+                                ExecutionOptions.BATCH_SHUFFLE_MODE.key(),
+                                BatchShuffleMode.ALL_EXCHANGES_BLOCKING));
+            }
+            Optional<Integer> compactParallelismOptional =
+                    
conf.getOptional(FileSystemConnectorOptions.COMPACTION_PARALLELISM);
+            int compactParallelism = 
compactParallelismOptional.orElse(sinkParallelism);
+            return createBatchCompactSink(
+                    dataStream,
+                    converter,
+                    recordWriterFactory,
+                    metaStoreFactory,
+                    fileConfigBuilder
+                            .withPartPrefix(
+                                    BatchCompactOperator.UNCOMPACTED_PREFIX
+                                            + "part-"
+                                            + UUID.randomUUID())
+                            .build(),
+                    stagingParentDir,
+                    sd,
+                    tableProps,
+                    isToLocal,
+                    overwrite,
+                    sinkParallelism,
+                    compactParallelism);
+        } else {
+            return createBatchNoCompactSink(
+                    dataStream,
+                    converter,
+                    recordWriterFactory,
+                    metaStoreFactory,
+                    fileConfigBuilder.build(),
+                    stagingParentDir,
+                    isToLocal,
+                    sinkParallelism);
+        }
+    }
+
+    private DataStreamSink<?> createBatchCompactSink(
             DataStream<RowData> dataStream,
             DataStructureConverter converter,
             HiveWriterFactory recordWriterFactory,
             TableMetaStoreFactory metaStoreFactory,
             OutputFileConfig fileNaming,
             String stagingParentDir,
+            StorageDescriptor sd,
+            Properties tableProps,
             boolean isToLocal,
-            final int parallelism)
+            boolean overwrite,
+            final int sinkParallelism,
+            final int compactParallelism)
+            throws IOException {
+        String[] partitionColumns = getPartitionKeyArray();
+        org.apache.flink.configuration.Configuration conf =
+                new org.apache.flink.configuration.Configuration();
+        catalogTable.getOptions().forEach(conf::setString);
+        HadoopFileSystemFactory fsFactory = fsFactory();
+        org.apache.flink.core.fs.Path tmpPath =
+                new 
org.apache.flink.core.fs.Path(toStagingDir(stagingParentDir, jobConf));
+
+        PartitionCommitPolicyFactory partitionCommitPolicyFactory =
+                new PartitionCommitPolicyFactory(
+                        
conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND),
+                        
conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_CLASS),
+                        
conf.get(HiveOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME));
+
+        org.apache.flink.core.fs.Path path = new 
org.apache.flink.core.fs.Path(sd.getLocation());
+        BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, 
?>> builder =
+                getBucketsBuilder(path, recordWriterFactory, sd, fileNaming, 
conf);
+
+        CompactReader.Factory<RowData> readerFactory = 
createCompactReaderFactory(sd, tableProps);
+
+        HiveOutputFormatFactory outputFormatFactory =
+                new HiveOutputFormatFactory(recordWriterFactory);
+        HiveRowPartitionComputer partitionComputer =
+                new HiveRowPartitionComputer(
+                        hiveShim,
+                        JobConfUtils.getDefaultPartitionName(jobConf),
+                        tableSchema.getFieldNames(),
+                        tableSchema.getFieldDataTypes(),
+                        partitionColumns);
+
+        DataStream<CoordinatorInput> writerDataStream =
+                dataStream
+                        .map((MapFunction<RowData, Row>) value -> (Row) 
converter.toExternal(value))

Review Comment:
   ```suggestion
                           .map(value -> (Row) converter.toExternal(value))
   ```



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java:
##########
@@ -351,15 +365,211 @@ private DataStreamSink<?> consume(
         }
     }
 
-    private DataStreamSink<Row> createBatchSink(
+    private DataStreamSink<?> createBatchSink(
+            DataStream<RowData> dataStream,
+            DataStructureConverter converter,
+            HiveWriterFactory recordWriterFactory,
+            TableMetaStoreFactory metaStoreFactory,
+            OutputFileConfig.OutputFileConfigBuilder fileConfigBuilder,
+            String stagingParentDir,
+            StorageDescriptor sd,
+            Properties tableProps,
+            boolean isToLocal,
+            boolean overwrite,
+            int sinkParallelism)
+            throws IOException {
+        org.apache.flink.configuration.Configuration conf =
+                new org.apache.flink.configuration.Configuration();
+        catalogTable.getOptions().forEach(conf::setString);
+        boolean autoCompaction = 
conf.getBoolean(FileSystemConnectorOptions.AUTO_COMPACTION);
+        if (autoCompaction) {
+            if (batchShuffleMode != BatchShuffleMode.ALL_EXCHANGES_BLOCKING) {
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Auto compaction for Hive sink in batch mode 
is not supported when the %s is not %s.",
+                                ExecutionOptions.BATCH_SHUFFLE_MODE.key(),
+                                BatchShuffleMode.ALL_EXCHANGES_BLOCKING));
+            }
+            Optional<Integer> compactParallelismOptional =
+                    
conf.getOptional(FileSystemConnectorOptions.COMPACTION_PARALLELISM);
+            int compactParallelism = 
compactParallelismOptional.orElse(sinkParallelism);
+            return createBatchCompactSink(
+                    dataStream,
+                    converter,
+                    recordWriterFactory,
+                    metaStoreFactory,
+                    fileConfigBuilder
+                            .withPartPrefix(
+                                    BatchCompactOperator.UNCOMPACTED_PREFIX
+                                            + "part-"
+                                            + UUID.randomUUID())
+                            .build(),
+                    stagingParentDir,
+                    sd,
+                    tableProps,
+                    isToLocal,
+                    overwrite,
+                    sinkParallelism,
+                    compactParallelism);
+        } else {
+            return createBatchNoCompactSink(
+                    dataStream,
+                    converter,
+                    recordWriterFactory,
+                    metaStoreFactory,
+                    fileConfigBuilder.build(),
+                    stagingParentDir,
+                    isToLocal,
+                    sinkParallelism);
+        }
+    }
+
+    private DataStreamSink<?> createBatchCompactSink(
             DataStream<RowData> dataStream,
             DataStructureConverter converter,
             HiveWriterFactory recordWriterFactory,
             TableMetaStoreFactory metaStoreFactory,
             OutputFileConfig fileNaming,
             String stagingParentDir,
+            StorageDescriptor sd,
+            Properties tableProps,
             boolean isToLocal,
-            final int parallelism)
+            boolean overwrite,
+            final int sinkParallelism,
+            final int compactParallelism)
+            throws IOException {
+        String[] partitionColumns = getPartitionKeyArray();
+        org.apache.flink.configuration.Configuration conf =
+                new org.apache.flink.configuration.Configuration();
+        catalogTable.getOptions().forEach(conf::setString);
+        HadoopFileSystemFactory fsFactory = fsFactory();
+        org.apache.flink.core.fs.Path tmpPath =
+                new 
org.apache.flink.core.fs.Path(toStagingDir(stagingParentDir, jobConf));
+
+        PartitionCommitPolicyFactory partitionCommitPolicyFactory =
+                new PartitionCommitPolicyFactory(
+                        
conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND),

Review Comment:
   We should also check the commit policy is not null when table has partition 
key like streaming sink?



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/BatchSink.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.batch;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.file.table.FileSystemFactory;
+import org.apache.flink.connector.file.table.FileSystemOutputFormat;
+import org.apache.flink.connector.file.table.PartitionCommitPolicyFactory;
+import org.apache.flink.connector.file.table.TableMetaStoreFactory;
+import 
org.apache.flink.connector.file.table.batch.compact.BatchCompactCoordinator;
+import 
org.apache.flink.connector.file.table.batch.compact.BatchCompactOperator;
+import 
org.apache.flink.connector.file.table.batch.compact.BatchPartitionCommitterSink;
+import 
org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages;
+import 
org.apache.flink.connector.file.table.stream.compact.CompactMessages.CoordinatorInput;
+import org.apache.flink.connector.file.table.stream.compact.CompactReader;
+import org.apache.flink.connector.file.table.stream.compact.CompactWriter;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.function.SupplierWithException;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.LinkedHashMap;
+
+/** Helper for creating batch file sink. */
+@Internal
+public class BatchSink {
+    private BatchSink() {}
+
+    public static DataStreamSink<Row> createBatchNoAutoCompactSink(
+            DataStream<RowData> dataStream,
+            DynamicTableSink.DataStructureConverter converter,
+            FileSystemOutputFormat<Row> fileSystemOutputFormat,
+            final int parallelism)
+            throws IOException {

Review Comment:
   Here is no need to throw exception.



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java:
##########
@@ -351,15 +365,211 @@ private DataStreamSink<?> consume(
         }
     }
 
-    private DataStreamSink<Row> createBatchSink(
+    private DataStreamSink<?> createBatchSink(
+            DataStream<RowData> dataStream,
+            DataStructureConverter converter,
+            HiveWriterFactory recordWriterFactory,
+            TableMetaStoreFactory metaStoreFactory,
+            OutputFileConfig.OutputFileConfigBuilder fileConfigBuilder,
+            String stagingParentDir,
+            StorageDescriptor sd,
+            Properties tableProps,
+            boolean isToLocal,
+            boolean overwrite,
+            int sinkParallelism)
+            throws IOException {
+        org.apache.flink.configuration.Configuration conf =
+                new org.apache.flink.configuration.Configuration();
+        catalogTable.getOptions().forEach(conf::setString);
+        boolean autoCompaction = 
conf.getBoolean(FileSystemConnectorOptions.AUTO_COMPACTION);
+        if (autoCompaction) {
+            if (batchShuffleMode != BatchShuffleMode.ALL_EXCHANGES_BLOCKING) {

Review Comment:
   I think in pipeline shuffle mode, auto compaction can also work.



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/BatchSink.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.batch;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.file.table.FileSystemFactory;
+import org.apache.flink.connector.file.table.FileSystemOutputFormat;
+import org.apache.flink.connector.file.table.PartitionCommitPolicyFactory;
+import org.apache.flink.connector.file.table.TableMetaStoreFactory;
+import 
org.apache.flink.connector.file.table.batch.compact.BatchCompactCoordinator;
+import 
org.apache.flink.connector.file.table.batch.compact.BatchCompactOperator;
+import 
org.apache.flink.connector.file.table.batch.compact.BatchPartitionCommitterSink;
+import 
org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages;
+import 
org.apache.flink.connector.file.table.stream.compact.CompactMessages.CoordinatorInput;
+import org.apache.flink.connector.file.table.stream.compact.CompactReader;
+import org.apache.flink.connector.file.table.stream.compact.CompactWriter;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.function.SupplierWithException;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.LinkedHashMap;
+
+/** Helper for creating batch file sink. */
+@Internal
+public class BatchSink {
+    private BatchSink() {}
+
+    public static DataStreamSink<Row> createBatchNoAutoCompactSink(
+            DataStream<RowData> dataStream,
+            DynamicTableSink.DataStructureConverter converter,
+            FileSystemOutputFormat<Row> fileSystemOutputFormat,
+            final int parallelism)
+            throws IOException {
+        return dataStream
+                .map((MapFunction<RowData, Row>) value -> (Row) 
converter.toExternal(value))

Review Comment:
   ```suggestion
                   .map(value -> ((Row) converter.toExternal(value)))
   ```



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java:
##########
@@ -140,6 +140,15 @@ public class HiveOptions {
     public static final ConfigOption<String> 
SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME =
             FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME;
 
+    public static final ConfigOption<MemorySize> COMPACT_SMALL_FILES_AVG_SIZE =
+            key("compaction.small-files.avg-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.ofMebiBytes(16))

Review Comment:
   How do you get this default value? Is it reasonable for user?



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableCompactSinkITCase.java:
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.api.common.BatchShuffleMode;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** IT case for Hive table compaction in batch mode. */
+public class HiveTableCompactSinkITCase {
+
+    @RegisterExtension
+    private static final MiniClusterExtension MINI_CLUSTER = new 
MiniClusterExtension();
+
+    private TableEnvironment tableEnv;
+    private HiveCatalog hiveCatalog;
+    private String warehouse;
+
+    @BeforeEach
+    public void setUp() {
+        hiveCatalog = HiveTestUtils.createHiveCatalog();
+        hiveCatalog.open();
+        warehouse = 
hiveCatalog.getHiveConf().getVar(HiveConf.ConfVars.METASTOREWAREHOUSE);
+        tableEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
+        tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+        tableEnv.useCatalog(hiveCatalog.getName());
+    }
+
+    @AfterEach
+    public void tearDown() {
+        if (hiveCatalog != null) {
+            hiveCatalog.close();
+        }
+    }
+
+    @Test
+    public void testNoCompaction() throws Exception {
+        tableEnv.executeSql(
+                "CREATE TABLE src ("
+                        + " key string,"
+                        + " value string"
+                        + ") TBLPROPERTIES ("
+                        + " 'auto-compaction' = 'true', "
+                        + " 'compaction.small-files.avg-size' = '1b', "
+                        + " 'sink.parallelism' = '4'" // set sink parallelism 
= 8
+                        + ")");
+        tableEnv.executeSql(
+                        "insert into src values ('k1', 'v1'), ('k2', 'v2'),"
+                                + "('k3', 'v3'), ('k4', 'v4')")
+                .await();
+
+        List<Path> files = listDataFiles(Paths.get(warehouse, "src"));
+        // auto compaction enabled, but the files' average size isn't less 
than 1b,  so the files
+        // num should be 4
+        assertThat(files).hasSize(4);
+        List<String> result = toSortedResult(tableEnv.executeSql("select * 
from src"));
+        assertThat(result.toString()).isEqualTo("[+I[k1, v1], +I[k2, v2], 
+I[k3, v3], +I[k4, v4]]");
+    }
+
+    @Test
+    public void testCompactNonPartitionedTable() throws Exception {
+        tableEnv.executeSql(
+                "CREATE TABLE src ("
+                        + " key string,"
+                        + " value string"
+                        + ") TBLPROPERTIES ("
+                        + " 'auto-compaction' = 'true', "
+                        + " 'sink.parallelism' = '4'" // set sink parallelism 
= 8
+                        + ")");
+        tableEnv.executeSql(
+                        "insert into src values ('k1', 'v1'), ('k2', 'v2'),"
+                                + "('k3', 'v3'), ('k4', 'v4')")
+                .await();
+
+        // auto compaction is enabled, so all the files should be merged be 
one file
+        List<Path> files = listDataFiles(Paths.get(warehouse, "src"));
+        assertThat(files).hasSize(1);
+        List<String> result = toSortedResult(tableEnv.executeSql("select * 
from src"));
+        assertThat(result.toString()).isEqualTo("[+I[k1, v1], +I[k2, v2], 
+I[k3, v3], +I[k4, v4]]");
+    }
+
+    @Test
+    public void testCompactPartitionedTable() throws Exception {
+        tableEnv.executeSql(
+                "CREATE TABLE src ("
+                        + " key string,"
+                        + " value string"
+                        + ") partitioned by (p1 int,p2 string) TBLPROPERTIES ("
+                        + " 'auto-compaction' = 'true', "
+                        + " 'sink.parallelism' = '8'" // set sink parallelism 
= 8
+                        + ")");
+
+        // test compaction for static partition
+        tableEnv.executeSql(
+                        "insert into src partition (p1=0,p2='static') values 
(1,'a'),(2,'b'),(3,'c')")
+                .await();
+        // auto compaction is enabled, so all the files in same partition 
should be merged be one
+        // file
+        List<Path> files = listDataFiles(Paths.get(warehouse, 
"src/p1=0/p2=static"));
+        assertThat(files).hasSize(1);
+        // verify the result
+        List<String> result = toSortedResult(tableEnv.executeSql("select * 
from src"));
+        assertThat(result.toString())
+                .isEqualTo("[+I[1, a, 0, static], +I[2, b, 0, static], +I[3, 
c, 0, static]]");
+
+        // test compaction for dynamic partition
+        tableEnv.executeSql(
+                        "insert into src partition (p1=0,p2) values 
(1,'a','d1'),"
+                                + " (2,'b','d2'), (3,'c','d1'), (4,'d','d2')")
+                .await();
+        // auto compaction is enabled, so all the files in same partition 
should be merged be one
+        // file
+        files = listDataFiles(Paths.get(warehouse, "src/p1=0/p2=d1"));
+        assertThat(files).hasSize(1);
+        files = listDataFiles(Paths.get(warehouse, "src/p1=0/p2=d2"));
+        assertThat(files).hasSize(1);
+        // verify the result
+        result = toSortedResult(tableEnv.executeSql("select * from src"));
+        assertThat(result.toString())
+                .isEqualTo(
+                        "[+I[1, a, 0, d1], +I[1, a, 0, static], +I[2, b, 0, 
d2],"
+                                + " +I[2, b, 0, static], +I[3, c, 0, d1], 
+I[3, c, 0, static], +I[4, d, 0, d2]]");
+    }
+
+    @Test
+    public void testConditionalCompact() throws Exception {
+        tableEnv.executeSql(
+                "CREATE TABLE src ("
+                        + " key string,"
+                        + " value string"
+                        + ") partitioned by (p int) TBLPROPERTIES ("
+                        + " 'auto-compaction' = 'true', "
+                        + " 'compaction.small-files.avg-size' = '9b', "
+                        + " 'sink.parallelism' = '4'" // set sink parallelism 
= 8
+                        + ")");
+
+        tableEnv.executeSql(
+                        "insert into src values ('k1', 'v1', 1), ('k2', 'v2', 
1),"
+                                + "('k3', 'v3', 2), ('k4', 'v4', 2), ('k5', 
'v5', 1)")
+                .await();
+
+        // one row is 6 bytes, so the partition "p=2" will contain two files, 
one of which only
+        // contain one row, the average size is 6 bytes. so the files should 
be merged to one single
+        // file.
+        List<Path> files = listDataFiles(Paths.get(warehouse, "src/p=2"));
+        assertThat(files).hasSize(1);
+
+        // the partition "p=1" will contain two files, one contain two rows, 
and the other contain
+        // one row. the average size is 9 bytes, so the files shouldn't be 
merged
+        files = listDataFiles(Paths.get(warehouse, "src/p=1"));
+        assertThat(files).hasSize(2);
+
+        List<String> result = toSortedResult(tableEnv.executeSql("select * 
from src"));
+        assertThat(result.toString())
+                .isEqualTo(
+                        "[+I[k1, v1, 1], +I[k2, v2, 1], +I[k3, v3, 2], +I[k4, 
v4, 2], +I[k5, v5, 1]]");
+    }
+
+    @Test
+    public void testCompactInNoBlockingMode() throws Exception {
+        // create a environment with non-blocking mode
+        tableEnv = createNoBlockingModeTableEnv();
+        tableEnv.executeSql(
+                "CREATE TABLE src ("
+                        + " key string,"
+                        + " value string"
+                        + ") partitioned by (p int) TBLPROPERTIES ("
+                        + " 'auto-compaction' = 'true', "
+                        + " 'compaction.small-files.avg-size' = '9b', "
+                        + " 'sink.parallelism' = '4'" // set sink parallelism 
= 8
+                        + ")");
+        assertThatThrownBy(
+                        () ->
+                                tableEnv.executeSql(
+                                        "insert into src values ('k1', 'v1', 
1), ('k2', 'v2', 1),"
+                                                + "('k3', 'v3', 2), ('k4', 
'v4', 2), ('k5', 'v5', 1)"))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage(
+                        String.format(
+                                "Auto compaction for Hive sink in batch mode 
is not supported when the %s is not %s.",
+                                ExecutionOptions.BATCH_SHUFFLE_MODE.key(),
+                                BatchShuffleMode.ALL_EXCHANGES_BLOCKING));
+    }
+
+    private TableEnvironment createNoBlockingModeTableEnv() {
+        EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
+        settings.getConfiguration()
+                .set(ExecutionOptions.BATCH_SHUFFLE_MODE, 
BatchShuffleMode.ALL_EXCHANGES_PIPELINED);

Review Comment:
   I think this option is job level, so we don't need to create a new 
TableEnvironment.
   ```
           tableEnv.getConfig().set(ExecutionOptions.BATCH_SHUFFLE_MODE, 
BatchShuffleMode.ALL_EXCHANGES_PIPELINED);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to