This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3cbacbf26f0 [FLINK-32129][fs-connector] Use string array in 
PartitionCommitInfo
3cbacbf26f0 is described below

commit 3cbacbf26f09b5301b280beed4f78fc03d573d76
Author: Shammon FY <[email protected]>
AuthorDate: Fri May 19 15:31:35 2023 +0800

    [FLINK-32129][fs-connector] Use string array in PartitionCommitInfo
    
    Using generics will throw exception when 'pipeline.generic-types' is 
disabled
    
    Close apache/flink#22609
---
 .../file/table/stream/PartitionCommitInfo.java     |  9 +++---
 .../file/table/stream/StreamingFileWriter.java     |  3 +-
 .../file/table/stream/compact/CompactOperator.java |  2 +-
 .../file/table/stream/PartitionCommitInfoTest.java | 37 ++++++++++++++++++++++
 .../file/table/stream/StreamingFileWriterTest.java |  3 +-
 .../table/stream/compact/CompactOperatorTest.java  |  2 +-
 6 files changed, 46 insertions(+), 10 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/PartitionCommitInfo.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/PartitionCommitInfo.java
index abc6ede1d51..5decc717c8e 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/PartitionCommitInfo.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/PartitionCommitInfo.java
@@ -21,7 +21,6 @@ package org.apache.flink.connector.file.table.stream;
 import org.apache.flink.annotation.Internal;
 
 import java.io.Serializable;
-import java.util.List;
 
 /**
  * The message sent by upstream.
@@ -37,12 +36,12 @@ public class PartitionCommitInfo implements Serializable {
     private long checkpointId;
     private int taskId;
     private int numberOfTasks;
-    private List<String> partitions;
+    private String[] partitions;
 
     public PartitionCommitInfo() {}
 
     public PartitionCommitInfo(
-            long checkpointId, int taskId, int numberOfTasks, List<String> 
partitions) {
+            long checkpointId, int taskId, int numberOfTasks, String[] 
partitions) {
         this.checkpointId = checkpointId;
         this.taskId = taskId;
         this.numberOfTasks = numberOfTasks;
@@ -73,11 +72,11 @@ public class PartitionCommitInfo implements Serializable {
         this.numberOfTasks = numberOfTasks;
     }
 
-    public List<String> getPartitions() {
+    public String[] getPartitions() {
         return partitions;
     }
 
-    public void setPartitions(List<String> partitions) {
+    public void setPartitions(String[] partitions) {
         this.partitions = partitions;
     }
 }
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/StreamingFileWriter.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/StreamingFileWriter.java
index eec6907bce3..7eb9faad119 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/StreamingFileWriter.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/StreamingFileWriter.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.state.StateSnapshotContext;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -152,6 +151,6 @@ public class StreamingFileWriter<IN> extends 
AbstractStreamingWriter<IN, Partiti
                                 checkpointId,
                                 getRuntimeContext().getIndexOfThisSubtask(),
                                 
getRuntimeContext().getNumberOfParallelSubtasks(),
-                                new ArrayList<>(partitions))));
+                                partitions.toArray(new String[0]))));
     }
 }
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java
index a99f39b57a5..fa51d4aa767 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java
@@ -158,7 +158,7 @@ public class CompactOperator<T> extends 
AbstractStreamOperator<PartitionCommitIn
                                 checkpoint,
                                 getRuntimeContext().getIndexOfThisSubtask(),
                                 
getRuntimeContext().getNumberOfParallelSubtasks(),
-                                new ArrayList<>(this.partitions))));
+                                this.partitions.toArray(new String[0]))));
         this.partitions.clear();
     }
 
diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/PartitionCommitInfoTest.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/PartitionCommitInfoTest.java
new file mode 100644
index 00000000000..1b2ee1f6fbf
--- /dev/null
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/PartitionCommitInfoTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.stream;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/** Tests for partition commit info. */
+public class PartitionCommitInfoTest {
+    @Test
+    public void testPartitionCommitSerializer() {
+        ExecutionConfig executionConfig = new ExecutionConfig();
+        executionConfig.disableGenericTypes();
+        assertNotNull(
+                
TypeInformation.of(PartitionCommitInfo.class).createSerializer(executionConfig));
+    }
+}
diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java
index 513084767d3..2e394a1ff93 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java
@@ -47,6 +47,7 @@ import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
@@ -327,7 +328,7 @@ class StreamingFileWriterTest {
     private static List<String> collect(
             OneInputStreamOperatorTestHarness<RowData, PartitionCommitInfo> 
harness) {
         List<String> parts = new ArrayList<>();
-        harness.extractOutputValues().forEach(m -> 
parts.addAll(m.getPartitions()));
+        harness.extractOutputValues().forEach(m -> 
parts.addAll(Arrays.asList(m.getPartitions())));
         return parts;
     }
 
diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/compact/CompactOperatorTest.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/compact/CompactOperatorTest.java
index 9d451594af5..4c1f5803431 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/compact/CompactOperatorTest.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/compact/CompactOperatorTest.java
@@ -76,7 +76,7 @@ class CompactOperatorTest extends AbstractCompactTestBase {
                     List<PartitionCommitInfo> outputs = 
harness.extractOutputValues();
                     assertThat(outputs).hasSize(1);
                     assertThat(outputs.get(0).getCheckpointId()).isEqualTo(1);
-                    
assertThat(outputs.get(0).getPartitions()).isEqualTo(Arrays.asList("p0", "p1"));
+                    assertThat(outputs.get(0).getPartitions()).isEqualTo(new 
String[] {"p0", "p1"});
 
                     // check all compacted file generated
                     assertThat(fs.exists(new Path(folder, 
"compacted-f0"))).isTrue();

Reply via email to