luoyuxia commented on code in PR #22831:
URL: https://github.com/apache/flink/pull/22831#discussion_r1243092293
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java:
##########
@@ -456,7 +456,8 @@ private DataStreamSink<?> createBatchCompactSink(
new PartitionCommitPolicyFactory(
conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND),
conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_CLASS),
-
conf.get(HiveOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME));
+
conf.get(HiveOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME),
+
conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_CLASS_PARAMETERS));
Review Comment:
```suggestion
conf.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_CLASS_PARAMETERS));
```
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java:
##########
@@ -610,7 +611,8 @@ private DataStreamSink<Row> createBatchNoCompactSink(
new PartitionCommitPolicyFactory(
conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND),
conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_CLASS),
-
conf.get(HiveOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME)));
+
conf.get(HiveOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME),
+
conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_CLASS_PARAMETERS)));
Review Comment:
dito
##########
docs/content.zh/docs/connectors/table/filesystem.md:
##########
@@ -451,6 +451,13 @@ public class HourPartTimeExtractor implements
PartitionTimeExtractor {
<td>String</td>
<td> 实现 PartitionCommitPolicy 接口的分区提交策略类。只有在 custom 提交策略下才使用该类。</td>
</tr>
+ <tr>
+ <td><h5>sink.partition-commit.policy.class.parameters</h5></td>
+ <td style="word-wrap: break-word;">(无)</td>
+ <td>String</td>
+ <td> 传入 custom 提交策略类的字符串参数, 要求多个参数之间用分号分隔, 比如 'param1;param2',
Review Comment:
```suggestion
<td> 传入 custom 提交策略类构造器的参数, 多个参数之间用分号分隔, 比如 'param1;param2',
```
##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionCommitPolicyFactory.java:
##########
@@ -63,11 +67,23 @@ public List<PartitionCommitPolicy> createPolicyChain(
successFileName, fsSupplier.get());
case PartitionCommitPolicy.CUSTOM:
try {
- return (PartitionCommitPolicy)
-
cl.loadClass(customClass).newInstance();
- } catch (ClassNotFoundException
- | IllegalAccessException
- | InstantiationException e) {
+ if (parameters != null &&
!parameters.isEmpty()) {
+ String[] paramStrings =
parameters.toArray(new String[parameters.size()]);
Review Comment:
```suggestion
String[] paramStrings =
parameters.toArray(new String[0]);
```
##########
docs/content/docs/connectors/table/filesystem.md:
##########
@@ -470,6 +470,14 @@ The partition commit policy defines what action is taken
when partitions are com
<td>String</td>
<td>The partition commit policy class for implement
PartitionCommitPolicy interface. Only work in custom commit policy.</td>
</tr>
+ <tr>
+ <td><h5>sink.partition-commit.policy.class.parameters</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The custom commit policy class can accept a string argument, which
can include multiple arguments separated by semicolons. For example,
'param1;param2'. The string argument will be split into a list (['param1',
'param2']) and passed as constructor parameters to the custom commit policy
class.</td>
Review Comment:
```suggestion
<td>The parameters passed to the constructor of the custom commit
policy, with multiple parameters separated by semicolons, such as
'param1;param2'. For example, 'param1;param2'. The configuration value will be
split into a list (['param1', 'param2']) and passed to the constructor of the
custom commit policy class.</td>
```
##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionCommitPolicyFactory.java:
##########
@@ -63,11 +67,23 @@ public List<PartitionCommitPolicy> createPolicyChain(
successFileName, fsSupplier.get());
case PartitionCommitPolicy.CUSTOM:
try {
- return (PartitionCommitPolicy)
-
cl.loadClass(customClass).newInstance();
- } catch (ClassNotFoundException
- | IllegalAccessException
- | InstantiationException e) {
+ if (parameters != null &&
!parameters.isEmpty()) {
Review Comment:
```suggestion
if
(!CollectionUtil.isNullOrEmpty(parameters)) {
```
##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionCommitPolicyFactory.java:
##########
@@ -63,11 +67,23 @@ public List<PartitionCommitPolicy> createPolicyChain(
successFileName, fsSupplier.get());
case PartitionCommitPolicy.CUSTOM:
try {
- return (PartitionCommitPolicy)
-
cl.loadClass(customClass).newInstance();
- } catch (ClassNotFoundException
- | IllegalAccessException
- | InstantiationException e) {
+ if (parameters != null &&
!parameters.isEmpty()) {
+ String[] paramStrings =
parameters.toArray(new String[parameters.size()]);
+ Class<?>[] classes = new
Class<?>[parameters.size()];
+ for (int i = 0; i <
parameters.size(); i++) {
+ classes[i] = String.class;
+ }
+ return (PartitionCommitPolicy)
+ cl.loadClass(customClass)
+
.getConstructor(classes)
+
.newInstance(paramStrings);
Review Comment:
```suggestion
.newInstance((Object[]) paramStrings);
```
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java:
##########
@@ -137,6 +138,9 @@ public class HiveOptions {
public static final ConfigOption<String>
SINK_PARTITION_COMMIT_POLICY_CLASS =
FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_CLASS;
+ public static final ConfigOption<List<String>>
SINK_PARTITION_COMMIT_POLICY_CLASS_PARAMETERS =
Review Comment:
I don't think we need to introduce a option to HiveOption. We can reuse the
`FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_CLASS_PARAMETERS`
##########
docs/content.zh/docs/connectors/table/filesystem.md:
##########
@@ -451,6 +451,13 @@ public class HourPartTimeExtractor implements
PartitionTimeExtractor {
<td>String</td>
<td> 实现 PartitionCommitPolicy 接口的分区提交策略类。只有在 custom 提交策略下才使用该类。</td>
</tr>
+ <tr>
+ <td><h5>sink.partition-commit.policy.class.parameters</h5></td>
+ <td style="word-wrap: break-word;">(无)</td>
+ <td>String</td>
+ <td> 传入 custom 提交策略类的字符串参数, 要求多个参数之间用分号分隔, 比如 'param1;param2',
+ 该字符串将被切分为列表(['param1','param2'])并传给 custom 提交策略类的构造器。该项为可选项。</td>
Review Comment:
```suggestion
该字符串将被切分为列表(['param1','param2'])并传给 custom
提交策略类的构造器。该项为可选项,不配置的话将使用类的默认构造方法。</td>
```
##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java:
##########
@@ -258,6 +263,38 @@ void testPartitionPathNotExist() throws Exception {
assertThat(outputPath.toFile().list()).isEqualTo(new String[0]);
}
+ @Test
+ void testCustomCommitPolicyWithParameters() {
Review Comment:
Sorry, I mean IT not UT. You can refer to this pr
https://github.com/apache/flink/pull/17245.
I think we can add test in `FileSystemTableSinkTest`.
##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemConnectorOptions.java:
##########
@@ -220,6 +221,15 @@ public class FileSystemConnectorOptions {
"The partition commit policy class for implement"
+ " PartitionCommitPolicy interface. Only
work in custom commit policy");
+ public static final ConfigOption<List<String>>
SINK_PARTITION_COMMIT_POLICY_CLASS_PARAMETERS =
+ key("sink.partition-commit.policy.class.parameters")
+ .stringType()
+ .asList()
+ .noDefaultValue()
+ .withDescription(
+ "A semicolon-separated string of parameters for
the custom commit policy class"
Review Comment:
We can resue the doc in the above:
```
The parameters passed to the constructor of the custom commit policy, with
multiple parameters separated by semicolons, such as 'param1;param2'.
For example, 'param1;param2'. The configuration value will be split into a
list (['param1', 'param2']) and passed to the constructor of the custom commit
policy class.
```
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java:
##########
@@ -137,6 +138,9 @@ public class HiveOptions {
public static final ConfigOption<String>
SINK_PARTITION_COMMIT_POLICY_CLASS =
FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_CLASS;
+ public static final ConfigOption<List<String>>
SINK_PARTITION_COMMIT_POLICY_CLASS_PARAMETERS =
Review Comment:
Btw, these two lines in HiveOption can also be delegated to Filesystem
option:
```
public static final ConfigOption<String>
SINK_PARTITION_COMMIT_POLICY_CLASS =
FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_CLASS;
public static final ConfigOption<String>
SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME =
FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME;
```
Could you please also help remove them in this pr?
##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java:
##########
@@ -258,6 +263,38 @@ void testPartitionPathNotExist() throws Exception {
assertThat(outputPath.toFile().list()).isEqualTo(new String[0]);
}
+ @Test
+ void testCustomCommitPolicyWithParameters() {
Review Comment:
Btw, I don't think we need the test in here if we have a IT.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]