This is an automated email from the ASF dual-hosted git repository.
libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3cbacbf26f0 [FLINK-32129][fs-connector] Use string array in
PartitionCommitInfo
3cbacbf26f0 is described below
commit 3cbacbf26f09b5301b280beed4f78fc03d573d76
Author: Shammon FY <[email protected]>
AuthorDate: Fri May 19 15:31:35 2023 +0800
[FLINK-32129][fs-connector] Use string array in PartitionCommitInfo
Using generics will throw exception when 'pipeline.generic-types' is
disabled
Close apache/flink#22609
---
.../file/table/stream/PartitionCommitInfo.java | 9 +++---
.../file/table/stream/StreamingFileWriter.java | 3 +-
.../file/table/stream/compact/CompactOperator.java | 2 +-
.../file/table/stream/PartitionCommitInfoTest.java | 37 ++++++++++++++++++++++
.../file/table/stream/StreamingFileWriterTest.java | 3 +-
.../table/stream/compact/CompactOperatorTest.java | 2 +-
6 files changed, 46 insertions(+), 10 deletions(-)
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/PartitionCommitInfo.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/PartitionCommitInfo.java
index abc6ede1d51..5decc717c8e 100644
---
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/PartitionCommitInfo.java
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/PartitionCommitInfo.java
@@ -21,7 +21,6 @@ package org.apache.flink.connector.file.table.stream;
import org.apache.flink.annotation.Internal;
import java.io.Serializable;
-import java.util.List;
/**
* The message sent by upstream.
@@ -37,12 +36,12 @@ public class PartitionCommitInfo implements Serializable {
private long checkpointId;
private int taskId;
private int numberOfTasks;
- private List<String> partitions;
+ private String[] partitions;
public PartitionCommitInfo() {}
public PartitionCommitInfo(
- long checkpointId, int taskId, int numberOfTasks, List<String>
partitions) {
+ long checkpointId, int taskId, int numberOfTasks, String[]
partitions) {
this.checkpointId = checkpointId;
this.taskId = taskId;
this.numberOfTasks = numberOfTasks;
@@ -73,11 +72,11 @@ public class PartitionCommitInfo implements Serializable {
this.numberOfTasks = numberOfTasks;
}
- public List<String> getPartitions() {
+ public String[] getPartitions() {
return partitions;
}
- public void setPartitions(List<String> partitions) {
+ public void setPartitions(String[] partitions) {
this.partitions = partitions;
}
}
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/StreamingFileWriter.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/StreamingFileWriter.java
index eec6907bce3..7eb9faad119 100644
---
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/StreamingFileWriter.java
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/StreamingFileWriter.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.state.StateSnapshotContext;
import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -152,6 +151,6 @@ public class StreamingFileWriter<IN> extends
AbstractStreamingWriter<IN, Partiti
checkpointId,
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks(),
- new ArrayList<>(partitions))));
+ partitions.toArray(new String[0]))));
}
}
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java
index a99f39b57a5..fa51d4aa767 100644
---
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java
@@ -158,7 +158,7 @@ public class CompactOperator<T> extends
AbstractStreamOperator<PartitionCommitIn
checkpoint,
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks(),
- new ArrayList<>(this.partitions))));
+ this.partitions.toArray(new String[0]))));
this.partitions.clear();
}
diff --git
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/PartitionCommitInfoTest.java
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/PartitionCommitInfoTest.java
new file mode 100644
index 00000000000..1b2ee1f6fbf
--- /dev/null
+++
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/PartitionCommitInfoTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.stream;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/** Tests for partition commit info. */
+public class PartitionCommitInfoTest {
+ @Test
+ public void testPartitionCommitSerializer() {
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ executionConfig.disableGenericTypes();
+ assertNotNull(
+
TypeInformation.of(PartitionCommitInfo.class).createSerializer(executionConfig));
+ }
+}
diff --git
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java
index 513084767d3..2e394a1ff93 100644
---
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java
+++
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java
@@ -47,6 +47,7 @@ import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
@@ -327,7 +328,7 @@ class StreamingFileWriterTest {
private static List<String> collect(
OneInputStreamOperatorTestHarness<RowData, PartitionCommitInfo>
harness) {
List<String> parts = new ArrayList<>();
- harness.extractOutputValues().forEach(m ->
parts.addAll(m.getPartitions()));
+ harness.extractOutputValues().forEach(m ->
parts.addAll(Arrays.asList(m.getPartitions())));
return parts;
}
diff --git
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/compact/CompactOperatorTest.java
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/compact/CompactOperatorTest.java
index 9d451594af5..4c1f5803431 100644
---
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/compact/CompactOperatorTest.java
+++
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/compact/CompactOperatorTest.java
@@ -76,7 +76,7 @@ class CompactOperatorTest extends AbstractCompactTestBase {
List<PartitionCommitInfo> outputs =
harness.extractOutputValues();
assertThat(outputs).hasSize(1);
assertThat(outputs.get(0).getCheckpointId()).isEqualTo(1);
-
assertThat(outputs.get(0).getPartitions()).isEqualTo(Arrays.asList("p0", "p1"));
+ assertThat(outputs.get(0).getPartitions()).isEqualTo(new
String[] {"p0", "p1"});
// check all compacted file generated
assertThat(fs.exists(new Path(folder,
"compacted-f0"))).isTrue();