This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3fc5917063 [Improve][connector-file-base] Improve the write 
performance of ORC file format (#10114)
3fc5917063 is described below

commit 3fc59170630e6c48e20cf5e77a151377e99f46e9
Author: shfshihuafeng <[email protected]>
AuthorDate: Fri Dec 5 01:19:34 2025 +0800

    [Improve][connector-file-base] Improve the write performance of ORC file 
format (#10114)
---
 .../file/sink/writer/OrcWriteStrategy.java         |  30 +++++-
 .../file/writer/OrcWriteStrategyTest.java          | 109 +++++++++++++++++++++
 2 files changed, 135 insertions(+), 4 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
index 366c9bb82a..339067e1c8 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
@@ -61,10 +61,12 @@ import java.util.Map;
 
 public class OrcWriteStrategy extends AbstractWriteStrategy<Writer> {
     private final LinkedHashMap<String, Writer> beingWrittenWriter;
+    private final LinkedHashMap<String, VectorizedRowBatch> 
vectorizedRowBatches;
 
     public OrcWriteStrategy(FileSinkConfig fileSinkConfig) {
         super(fileSinkConfig);
         this.beingWrittenWriter = new LinkedHashMap<>();
+        this.vectorizedRowBatches = new LinkedHashMap<>();
     }
 
     @Override
@@ -72,8 +74,8 @@ public class OrcWriteStrategy extends 
AbstractWriteStrategy<Writer> {
         super.write(seaTunnelRow);
         String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
         Writer writer = getOrCreateOutputStream(filePath);
-        TypeDescription schema = buildSchemaWithRowType();
-        VectorizedRowBatch rowBatch = schema.createRowBatch();
+        VectorizedRowBatch rowBatch = getOrCreateVectorizedRowBatch(filePath);
+
         int i = 0;
         int row = rowBatch.size++;
         for (Integer index : sinkColumnsIndexInRow) {
@@ -83,8 +85,11 @@ public class OrcWriteStrategy extends 
AbstractWriteStrategy<Writer> {
             i++;
         }
         try {
-            writer.addRowBatch(rowBatch);
-            rowBatch.reset();
+            if (rowBatch.size == rowBatch.getMaxSize()) {
+                writer.addRowBatch(rowBatch);
+                rowBatch.reset();
+            }
+
         } catch (IOException e) {
             throw CommonError.fileOperationFailed("OrcFile", "write", 
filePath, e);
         }
@@ -95,6 +100,11 @@ public class OrcWriteStrategy extends 
AbstractWriteStrategy<Writer> {
         this.beingWrittenWriter.forEach(
                 (k, v) -> {
                     try {
+                        VectorizedRowBatch rowBatch = 
getOrCreateVectorizedRowBatch(k);
+                        if (rowBatch.size > 0) {
+                            v.addRowBatch(rowBatch);
+                            rowBatch.reset();
+                        }
                         v.close();
                     } catch (IOException e) {
                         String errorMsg =
@@ -106,9 +116,21 @@ public class OrcWriteStrategy extends 
AbstractWriteStrategy<Writer> {
                     }
                     needMoveFiles.put(k, getTargetLocation(k));
                 });
+        this.vectorizedRowBatches.clear();
         this.beingWrittenWriter.clear();
     }
 
+    private VectorizedRowBatch getOrCreateVectorizedRowBatch(@NonNull String 
filePath) {
+        VectorizedRowBatch vectorizedRowBatch = 
this.vectorizedRowBatches.get(filePath);
+        if (vectorizedRowBatch == null) {
+            TypeDescription schema = buildSchemaWithRowType();
+            VectorizedRowBatch rowBatch = schema.createRowBatch();
+            this.vectorizedRowBatches.put(filePath, rowBatch);
+            return rowBatch;
+        }
+        return vectorizedRowBatch;
+    }
+
     @Override
     public Writer getOrCreateOutputStream(@NonNull String filePath) {
         Writer writer = this.beingWrittenWriter.get(filePath);
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcWriteStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcWriteStrategyTest.java
new file mode 100644
index 0000000000..7dbc8bd9de
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcWriteStrategyTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.writer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.OrcWriteStrategy;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.OrcReadStrategy;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
+
+@Slf4j
+public class OrcWriteStrategyTest {
+    private static final String TMP_PATH = 
"file:///tmp/seatunnel/orc/batch/test";
+    private static final int ORC_WRITE_NUMBER = 2000;
+
+    @DisabledOnOs(OS.WINDOWS)
+    @Test
+    public void testOrcWriteWithBatch() throws Exception {
+        Map<String, Object> writeConfig = new HashMap<>();
+        writeConfig.put("tmp_path", TMP_PATH);
+        writeConfig.put("path", "file:///tmp/seatunnel/orc/batch");
+        writeConfig.put("file_format_type", FileFormat.ORC.name());
+
+        SeaTunnelRowType writeRowType =
+                new SeaTunnelRowType(
+                        new String[] {"f1_text"},
+                        new SeaTunnelDataType[] {
+                            BasicType.STRING_TYPE,
+                        });
+        FileSinkConfig writeSinkConfig =
+                new FileSinkConfig(ConfigFactory.parseMap(writeConfig), 
writeRowType);
+        OrcWriteStrategy writeStrategy = new OrcWriteStrategy(writeSinkConfig);
+
+        OrcReadStrategyTest.LocalConf hadoopConf =
+                new OrcReadStrategyTest.LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        writeStrategy.setCatalogTable(
+                CatalogTableUtil.getCatalogTable("test", null, null, "test", 
writeRowType));
+        writeStrategy.init(hadoopConf, "test1", "test1", 0);
+        writeStrategy.beginTransaction(1L);
+        for (int i = 0; i < ORC_WRITE_NUMBER; i++) {
+            writeStrategy.write(new SeaTunnelRow(new Object[] {"test_" + i}));
+        }
+        writeStrategy.finishAndCloseFile();
+        writeStrategy.close();
+
+        OrcReadStrategy readStrategy = new OrcReadStrategy();
+        readStrategy.init(hadoopConf);
+        List<String> readFiles = readStrategy.getFileNamesByPath(TMP_PATH);
+        Assertions.assertEquals(1, readFiles.size());
+        String readFilePath = readFiles.get(0);
+
+        SeaTunnelRowType readRowType = 
readStrategy.getSeaTunnelRowTypeInfo(readFilePath);
+        Assertions.assertEquals(
+                BasicType.STRING_TYPE.getSqlType(), 
readRowType.getFieldType(0).getSqlType());
+        List<SeaTunnelRow> readRows = new ArrayList<>();
+        Collector<SeaTunnelRow> readCollector =
+                new Collector<SeaTunnelRow>() {
+                    @Override
+                    public void collect(SeaTunnelRow record) {
+                        Assertions.assertTrue(record.getField(0) instanceof 
String);
+                        readRows.add(record);
+                    }
+
+                    @Override
+                    public Object getCheckpointLock() {
+                        return null;
+                    }
+                };
+        readStrategy.read(readFilePath, "test", readCollector);
+        Assertions.assertEquals(ORC_WRITE_NUMBER, readRows.size());
+        readStrategy.close();
+    }
+}

Reply via email to