This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3fc5917063 [Improve][connector-file-base] Improve the write
performance of ORC file format (#10114)
3fc5917063 is described below
commit 3fc59170630e6c48e20cf5e77a151377e99f46e9
Author: shfshihuafeng <[email protected]>
AuthorDate: Fri Dec 5 01:19:34 2025 +0800
[Improve][connector-file-base] Improve the write performance of ORC file
format (#10114)
---
.../file/sink/writer/OrcWriteStrategy.java | 30 +++++-
.../file/writer/OrcWriteStrategyTest.java | 109 +++++++++++++++++++++
2 files changed, 135 insertions(+), 4 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
index 366c9bb82a..339067e1c8 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
@@ -61,10 +61,12 @@ import java.util.Map;
public class OrcWriteStrategy extends AbstractWriteStrategy<Writer> {
private final LinkedHashMap<String, Writer> beingWrittenWriter;
+ private final LinkedHashMap<String, VectorizedRowBatch>
vectorizedRowBatches;
public OrcWriteStrategy(FileSinkConfig fileSinkConfig) {
super(fileSinkConfig);
this.beingWrittenWriter = new LinkedHashMap<>();
+ this.vectorizedRowBatches = new LinkedHashMap<>();
}
@Override
@@ -72,8 +74,8 @@ public class OrcWriteStrategy extends
AbstractWriteStrategy<Writer> {
super.write(seaTunnelRow);
String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
Writer writer = getOrCreateOutputStream(filePath);
- TypeDescription schema = buildSchemaWithRowType();
- VectorizedRowBatch rowBatch = schema.createRowBatch();
+ VectorizedRowBatch rowBatch = getOrCreateVectorizedRowBatch(filePath);
+
int i = 0;
int row = rowBatch.size++;
for (Integer index : sinkColumnsIndexInRow) {
@@ -83,8 +85,11 @@ public class OrcWriteStrategy extends
AbstractWriteStrategy<Writer> {
i++;
}
try {
- writer.addRowBatch(rowBatch);
- rowBatch.reset();
+ if (rowBatch.size == rowBatch.getMaxSize()) {
+ writer.addRowBatch(rowBatch);
+ rowBatch.reset();
+ }
+
} catch (IOException e) {
throw CommonError.fileOperationFailed("OrcFile", "write",
filePath, e);
}
@@ -95,6 +100,11 @@ public class OrcWriteStrategy extends
AbstractWriteStrategy<Writer> {
this.beingWrittenWriter.forEach(
(k, v) -> {
try {
+ VectorizedRowBatch rowBatch =
getOrCreateVectorizedRowBatch(k);
+ if (rowBatch.size > 0) {
+ v.addRowBatch(rowBatch);
+ rowBatch.reset();
+ }
v.close();
} catch (IOException e) {
String errorMsg =
@@ -106,9 +116,21 @@ public class OrcWriteStrategy extends
AbstractWriteStrategy<Writer> {
}
needMoveFiles.put(k, getTargetLocation(k));
});
+ this.vectorizedRowBatches.clear();
this.beingWrittenWriter.clear();
}
+ private VectorizedRowBatch getOrCreateVectorizedRowBatch(@NonNull String
filePath) {
+ VectorizedRowBatch vectorizedRowBatch =
this.vectorizedRowBatches.get(filePath);
+ if (vectorizedRowBatch == null) {
+ TypeDescription schema = buildSchemaWithRowType();
+ VectorizedRowBatch rowBatch = schema.createRowBatch();
+ this.vectorizedRowBatches.put(filePath, rowBatch);
+ return rowBatch;
+ }
+ return vectorizedRowBatch;
+ }
+
@Override
public Writer getOrCreateOutputStream(@NonNull String filePath) {
Writer writer = this.beingWrittenWriter.get(filePath);
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcWriteStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcWriteStrategyTest.java
new file mode 100644
index 0000000000..7dbc8bd9de
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcWriteStrategyTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.writer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.OrcWriteStrategy;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.OrcReadStrategy;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
+
+@Slf4j
+public class OrcWriteStrategyTest {
+ private static final String TMP_PATH =
"file:///tmp/seatunnel/orc/batch/test";
+ private static final int ORC_WRITE_NUMBER = 2000;
+
+ @DisabledOnOs(OS.WINDOWS)
+ @Test
+ public void testOrcWriteWithBatch() throws Exception {
+ Map<String, Object> writeConfig = new HashMap<>();
+ writeConfig.put("tmp_path", TMP_PATH);
+ writeConfig.put("path", "file:///tmp/seatunnel/orc/batch");
+ writeConfig.put("file_format_type", FileFormat.ORC.name());
+
+ SeaTunnelRowType writeRowType =
+ new SeaTunnelRowType(
+ new String[] {"f1_text"},
+ new SeaTunnelDataType[] {
+ BasicType.STRING_TYPE,
+ });
+ FileSinkConfig writeSinkConfig =
+ new FileSinkConfig(ConfigFactory.parseMap(writeConfig),
writeRowType);
+ OrcWriteStrategy writeStrategy = new OrcWriteStrategy(writeSinkConfig);
+
+ OrcReadStrategyTest.LocalConf hadoopConf =
+ new OrcReadStrategyTest.LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ writeStrategy.setCatalogTable(
+ CatalogTableUtil.getCatalogTable("test", null, null, "test",
writeRowType));
+ writeStrategy.init(hadoopConf, "test1", "test1", 0);
+ writeStrategy.beginTransaction(1L);
+ for (int i = 0; i < ORC_WRITE_NUMBER; i++) {
+ writeStrategy.write(new SeaTunnelRow(new Object[] {"test_" + i}));
+ }
+ writeStrategy.finishAndCloseFile();
+ writeStrategy.close();
+
+ OrcReadStrategy readStrategy = new OrcReadStrategy();
+ readStrategy.init(hadoopConf);
+ List<String> readFiles = readStrategy.getFileNamesByPath(TMP_PATH);
+ Assertions.assertEquals(1, readFiles.size());
+ String readFilePath = readFiles.get(0);
+
+ SeaTunnelRowType readRowType =
readStrategy.getSeaTunnelRowTypeInfo(readFilePath);
+ Assertions.assertEquals(
+ BasicType.STRING_TYPE.getSqlType(),
readRowType.getFieldType(0).getSqlType());
+ List<SeaTunnelRow> readRows = new ArrayList<>();
+ Collector<SeaTunnelRow> readCollector =
+ new Collector<SeaTunnelRow>() {
+ @Override
+ public void collect(SeaTunnelRow record) {
+ Assertions.assertTrue(record.getField(0) instanceof
String);
+ readRows.add(record);
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return null;
+ }
+ };
+ readStrategy.read(readFilePath, "test", readCollector);
+ Assertions.assertEquals(ORC_WRITE_NUMBER, readRows.size());
+ readStrategy.close();
+ }
+}