wuchong commented on code in PR #20469:
URL: https://github.com/apache/flink/pull/20469#discussion_r942380136
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java:
##########
@@ -95,6 +95,20 @@ public class HiveOptions {
+ " When the value is over estimated,
Flink will tend to pack Hive's data into less splits, which will be helpful
when Hive's table contains many small files."
+ " And vice versa. It only works for the
Hive table stored as ORC format.");
+ public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_KIND
=
+ key("sink.partition-commit.policy.kind")
+ .stringType()
+ .defaultValue("metastore,success-file")
+ .withDescription(
+ "Policy to commit a partition is to notify the
downstream"
+ + " application that the partition has
finished writing, the partition"
+ + " is ready to be read."
+ + " metastore: add partition to metastore.
"
+ + " success-file: add '_success' file to
directory."
Review Comment:
```
success-file: add a success file to the partition directory. The success
file name can be configured by the 'sink.partition-commit.success-file.name'
option.
```
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java:
##########
@@ -95,6 +95,20 @@ public class HiveOptions {
+ " When the value is over estimated,
Flink will tend to pack Hive's data into less splits, which will be helpful
when Hive's table contains many small files."
+ " And vice versa. It only works for the
Hive table stored as ORC format.");
+ public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_KIND
=
Review Comment:
Add a comment to explain why we need to re-define a same configuration here,
e.g.,
```java
/**
* Hive users usually commit partition for metastore and a _SUCCESS
file. That's why we
* create a same option with {@link
* FileSystemConnectorOptions#SINK_PARTITION_COMMIT_POLICY_KIND} with
different default value.
*/
```
##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java:
##########
@@ -173,7 +206,14 @@ void testEmptyPartition() throws Exception {
assertThat(emptyPartitionFile).exists();
assertThat(emptyPartitionFile).isDirectory();
- assertThat(emptyPartitionFile).isEmptyDirectory();
+ files =
+ FileUtils.listFilesInDirectory(
+ Paths.get(emptyPartitionFile.getPath()),
+ (path) ->
+ !path.toFile().isHidden()
+ &&
!path.toFile().getName().startsWith("_"))
Review Comment:
I think this can be simplified into
```java
assertThat(emptyPartitionFile)
.isDirectoryNotContaining(file ->
!file.getName().equals("_SUCCESS"));
```
There shouldn't be hidden files in the directory, right?
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java:
##########
@@ -337,6 +340,12 @@ private DataStreamSink<Row> createBatchSink(
builder.setTempPath(
new
org.apache.flink.core.fs.Path(toStagingDir(stagingParentDir, jobConf)));
builder.setOutputFileConfig(fileNaming);
+ builder.setIdentifier(identifier);
+
builder.setPolicyKind(conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND));
+ builder.setCustomClass(
+
conf.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_CLASS));
+ builder.setSuccessFileName(
+
conf.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME));
Review Comment:
Please only options of `HiveOptions` in the Hive connector. Otherwise, it's
error-prone to use `FileSystemConnectorOptions`, e.g., unexpected default
value. You can reuse the options by referring the definition from
`FileSystemConnectorOptions`, for example, in `HiveOptions.java`:
```java
public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_CLASS =
FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_CLASS;
public static final ConfigOption<String>
SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME =
FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME;
```
##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemCommitter.java:
##########
@@ -58,7 +59,9 @@ class FileSystemCommitter implements Serializable {
private final boolean isToLocal;
private final Path tmpPath;
private final int partitionColumnSize;
+ private final ObjectIdentifier identifier;
private final LinkedHashMap<String, String> staticPartitions;
+ private final List<PartitionCommitPolicy> policies;
Review Comment:
`PartitionCommitPolicy` is not serializable, but `FileSystemCommitter`
implements `Serializable`. I checked that `FileSystemCommitter` is only used as
a local variable. Therefore, `FileSystemCommitter` doesn't need to implement
the `Serializable` interface.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]