This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 116fffc2969 [feat](jni-writer) Support JNI-based write framework for 
custom writers (#60756)
116fffc2969 is described below

commit 116fffc29699f274375086412764077788c2974f
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Sun Feb 15 00:28:23 2026 +0800

    [feat](jni-writer) Support JNI-based write framework for custom writers 
(#60756)
    
    ### What problem does this PR solve?
    
    Related #60768
    
    Introduce a symmetric JNI writer mechanism that complements the existing
    JNI scanner framework, enabling custom file writers to be implemented in
    Java and invoked through JNI from the C++ query executor.
    
    Key changes:
    - Add VJniFormatTransformer for delegating writes to Java via JNI
    - Extend TTVFTableSink with writer_type and writer_class fields
    - Add JniWriter base class and LocalFileJniWriter reference
    implementation
    - Integrate with file_format_transformer factory routing logic
    
    The implementation follows the same patterns as JniScanner for
    consistency:
    - Reuses class loading mechanism (Jni::Util::get_jni_scanner_class)
    - Mirrors constructor signature: (int batchSize, Map<String,String>
    params)
    - Supports schema caching and performance instrumentation
---
 .../runtime/vfile_format_transformer_factory.cpp   |  23 ++++
 be/src/vec/runtime/vjni_format_transformer.cpp     | 136 +++++++++++++++++++++
 be/src/vec/runtime/vjni_format_transformer.h       |  74 +++++++++++
 be/src/vec/sink/writer/vtvf_table_writer.cpp       |  27 ++--
 build.sh                                           |   4 +-
 .../org/apache/doris/common/jni/JniWriter.java     |  99 +++++++++++++++
 fe/be-java-extensions/java-writer/pom.xml          |  75 ++++++++++++
 .../apache/doris/writer/LocalFileJniWriter.java    | 117 ++++++++++++++++++
 .../java-writer/src/main/resources/package.xml     |  41 +++++++
 fe/be-java-extensions/pom.xml                      |   1 +
 .../org/apache/doris/planner/TVFTableSink.java     |  10 ++
 gensrc/thrift/DataSinks.thrift                     |   7 ++
 12 files changed, 603 insertions(+), 11 deletions(-)

diff --git a/be/src/vec/runtime/vfile_format_transformer_factory.cpp 
b/be/src/vec/runtime/vfile_format_transformer_factory.cpp
index bbd415e2452..69751cc637f 100644
--- a/be/src/vec/runtime/vfile_format_transformer_factory.cpp
+++ b/be/src/vec/runtime/vfile_format_transformer_factory.cpp
@@ -23,6 +23,7 @@
 #include <vector>
 
 #include "vec/runtime/vcsv_transformer.h"
+#include "vec/runtime/vjni_format_transformer.h"
 #include "vec/runtime/vorc_transformer.h"
 #include "vec/runtime/vparquet_transformer.h"
 
@@ -32,6 +33,28 @@ Status create_tvf_format_transformer(const TTVFTableSink& 
tvf_sink, RuntimeState
                                      io::FileWriter* file_writer,
                                      const VExprContextSPtrs& 
output_vexpr_ctxs,
                                      std::unique_ptr<VFileFormatTransformer>* 
result) {
+    // JNI writer path: delegate to Java-side writer
+    if (tvf_sink.__isset.writer_type && tvf_sink.writer_type == 
TTVFWriterType::JNI) {
+        if (!tvf_sink.__isset.writer_class) {
+            return Status::InternalError("writer_class is required when 
writer_type is JNI");
+        }
+        std::map<std::string, std::string> writer_params;
+        if (tvf_sink.__isset.properties) {
+            writer_params = tvf_sink.properties;
+        }
+        writer_params["file_path"] = tvf_sink.file_path;
+        if (tvf_sink.__isset.column_separator) {
+            writer_params["column_separator"] = tvf_sink.column_separator;
+        }
+        if (tvf_sink.__isset.line_delimiter) {
+            writer_params["line_delimiter"] = tvf_sink.line_delimiter;
+        }
+        result->reset(new VJniFormatTransformer(state, output_vexpr_ctxs, 
tvf_sink.writer_class,
+                                                std::move(writer_params)));
+        return Status::OK();
+    }
+
+    // Native writer path
     TFileFormatType::type format = tvf_sink.file_format;
     switch (format) {
     case TFileFormatType::FORMAT_CSV_PLAIN: {
diff --git a/be/src/vec/runtime/vjni_format_transformer.cpp 
b/be/src/vec/runtime/vjni_format_transformer.cpp
new file mode 100644
index 00000000000..ef7ed6048e3
--- /dev/null
+++ b/be/src/vec/runtime/vjni_format_transformer.cpp
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/runtime/vjni_format_transformer.h"
+
+#include "runtime/runtime_state.h"
+#include "vec/exec/jni_connector.h"
+
+namespace doris::vectorized {
+
+VJniFormatTransformer::VJniFormatTransformer(RuntimeState* state,
+                                             const VExprContextSPtrs& 
output_vexpr_ctxs,
+                                             std::string writer_class,
+                                             std::map<std::string, 
std::string> writer_params)
+        : VFileFormatTransformer(state, output_vexpr_ctxs, false),
+          _writer_class(std::move(writer_class)),
+          _writer_params(std::move(writer_params)) {}
+
+Status VJniFormatTransformer::_init_jni_writer(JNIEnv* env, int batch_size) {
+    // Load writer class via the same class loader as JniScanner
+    Jni::GlobalClass jni_writer_cls;
+    RETURN_IF_ERROR(Jni::Util::get_jni_scanner_class(env, 
_writer_class.c_str(), &jni_writer_cls));
+
+    // Get constructor: (int batchSize, Map<String,String> params)
+    Jni::MethodId writer_constructor;
+    RETURN_IF_ERROR(
+            jni_writer_cls.get_method(env, "<init>", "(ILjava/util/Map;)V", 
&writer_constructor));
+
+    // Convert C++ params map to Java HashMap
+    Jni::LocalObject hashmap_object;
+    RETURN_IF_ERROR(Jni::Util::convert_to_java_map(env, _writer_params, 
&hashmap_object));
+
+    // Create writer instance
+    RETURN_IF_ERROR(jni_writer_cls.new_object(env, writer_constructor)
+                            .with_arg((jint)batch_size)
+                            .with_arg(hashmap_object)
+                            .call(&_jni_writer_obj));
+
+    // Resolve method IDs
+    RETURN_IF_ERROR(jni_writer_cls.get_method(env, "open", "()V", 
&_jni_writer_open));
+    RETURN_IF_ERROR(
+            jni_writer_cls.get_method(env, "write", "(Ljava/util/Map;)V", 
&_jni_writer_write));
+    RETURN_IF_ERROR(jni_writer_cls.get_method(env, "close", "()V", 
&_jni_writer_close));
+    RETURN_IF_ERROR(jni_writer_cls.get_method(env, "getStatistics", 
"()Ljava/util/Map;",
+                                              &_jni_writer_get_statistics));
+    return Status::OK();
+}
+
+Status VJniFormatTransformer::open() {
+    JNIEnv* env = nullptr;
+    RETURN_IF_ERROR(Jni::Env::Get(&env));
+
+    int batch_size = _state->batch_size();
+    RETURN_IF_ERROR(_init_jni_writer(env, batch_size));
+
+    RETURN_IF_ERROR(_jni_writer_obj.call_void_method(env, 
_jni_writer_open).call());
+    RETURN_ERROR_IF_EXC(env);
+
+    _opened = true;
+    return Status::OK();
+}
+
+Status VJniFormatTransformer::write(const Block& block) {
+    if (block.rows() == 0) {
+        return Status::OK();
+    }
+
+    JNIEnv* env = nullptr;
+    RETURN_IF_ERROR(Jni::Env::Get(&env));
+
+    // 1. Convert Block to Java table metadata (column addresses)
+    Block* mutable_block = const_cast<Block*>(&block);
+    std::unique_ptr<long[]> input_table;
+    RETURN_IF_ERROR(JniConnector::to_java_table(mutable_block, input_table));
+
+    // 2. Cache schema on first call
+    if (!_schema_cached) {
+        auto schema = JniConnector::parse_table_schema(mutable_block);
+        _cached_required_fields = schema.first;
+        _cached_columns_types = schema.second;
+        _schema_cached = true;
+    }
+
+    // 3. Build input params map for Java writer
+    std::map<std::string, std::string> input_params = {
+            {"meta_address", std::to_string((long)input_table.get())},
+            {"required_fields", _cached_required_fields},
+            {"columns_types", _cached_columns_types}};
+
+    // 4. Convert to Java Map and call writer.write(inputParams)
+    Jni::LocalObject input_map;
+    RETURN_IF_ERROR(Jni::Util::convert_to_java_map(env, input_params, 
&input_map));
+
+    RETURN_IF_ERROR(
+            _jni_writer_obj.call_void_method(env, 
_jni_writer_write).with_arg(input_map).call());
+    RETURN_ERROR_IF_EXC(env);
+
+    _cur_written_rows += block.rows();
+    return Status::OK();
+}
+
+Status VJniFormatTransformer::close() {
+    if (_closed || !_opened) {
+        return Status::OK();
+    }
+    _closed = true;
+
+    JNIEnv* env = nullptr;
+    RETURN_IF_ERROR(Jni::Env::Get(&env));
+
+    RETURN_IF_ERROR(_jni_writer_obj.call_void_method(env, 
_jni_writer_close).call());
+    RETURN_ERROR_IF_EXC(env);
+
+    return Status::OK();
+}
+
+int64_t VJniFormatTransformer::written_len() {
+    // JNI writer manages file size on Java side; return 0 to disable C++ 
auto-split.
+    return 0;
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/runtime/vjni_format_transformer.h 
b/be/src/vec/runtime/vjni_format_transformer.h
new file mode 100644
index 00000000000..aa63d65e516
--- /dev/null
+++ b/be/src/vec/runtime/vjni_format_transformer.h
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <map>
+#include <string>
+
+#include "util/jni-util.h"
+#include "vfile_format_transformer.h"
+
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+
+/**
+ * VJniFormatTransformer is a VFileFormatTransformer implementation that 
delegates
+ * write operations to a Java-side JniWriter via JNI. It sits alongside
+ * VCSVTransformer/VParquetTransformer/VOrcTransformer as a peer 
implementation.
+ *
+ * The Java writer class must extend org.apache.doris.common.jni.JniWriter and
+ * follow the same constructor signature as JniScanner: (int batchSize, 
Map<String,String> params).
+ */
+class VJniFormatTransformer final : public VFileFormatTransformer {
+public:
+    VJniFormatTransformer(RuntimeState* state, const VExprContextSPtrs& 
output_vexpr_ctxs,
+                          std::string writer_class,
+                          std::map<std::string, std::string> writer_params);
+
+    ~VJniFormatTransformer() override = default;
+
+    Status open() override;
+    Status write(const Block& block) override;
+    Status close() override;
+    int64_t written_len() override;
+
+private:
+    Status _init_jni_writer(JNIEnv* env, int batch_size);
+
+    std::string _writer_class;
+    std::map<std::string, std::string> _writer_params;
+
+    // JNI handles (same pattern as JniConnector)
+    Jni::GlobalObject _jni_writer_obj;
+    Jni::MethodId _jni_writer_open;
+    Jni::MethodId _jni_writer_write;
+    Jni::MethodId _jni_writer_close;
+    Jni::MethodId _jni_writer_get_statistics;
+
+    // Schema cache (computed on first write, reused afterwards)
+    bool _schema_cached = false;
+    std::string _cached_required_fields;
+    std::string _cached_columns_types;
+
+    bool _opened = false;
+    bool _closed = false;
+};
+
+} // namespace doris::vectorized
+
+#include "common/compile_check_end.h"
diff --git a/be/src/vec/sink/writer/vtvf_table_writer.cpp 
b/be/src/vec/sink/writer/vtvf_table_writer.cpp
index 7d82e8ef471..63dc3850c97 100644
--- a/be/src/vec/sink/writer/vtvf_table_writer.cpp
+++ b/be/src/vec/sink/writer/vtvf_table_writer.cpp
@@ -87,21 +87,28 @@ Status VTVFTableWriter::close(Status status) {
 }
 
 Status VTVFTableWriter::_create_file_writer(const std::string& file_name) {
-    TFileType::type file_type = _tvf_sink.file_type;
-    std::map<std::string, std::string> properties;
-    if (_tvf_sink.__isset.properties) {
-        properties = _tvf_sink.properties;
+    bool use_jni = _tvf_sink.__isset.writer_type && _tvf_sink.writer_type == 
TTVFWriterType::JNI;
+
+    if (!use_jni) {
+        // Native path: create file writer via FileFactory
+        TFileType::type file_type = _tvf_sink.file_type;
+        std::map<std::string, std::string> properties;
+        if (_tvf_sink.__isset.properties) {
+            properties = _tvf_sink.properties;
+        }
+
+        _file_writer_impl = DORIS_TRY(FileFactory::create_file_writer(
+                file_type, _state->exec_env(), {}, properties, file_name,
+                {.write_file_cache = false, .sync_file_data = false}));
     }
 
-    _file_writer_impl = DORIS_TRY(FileFactory::create_file_writer(
-            file_type, _state->exec_env(), {}, properties, file_name,
-            {.write_file_cache = false, .sync_file_data = false}));
-
-    RETURN_IF_ERROR(create_tvf_format_transformer(_tvf_sink, _state, 
_file_writer_impl.get(),
+    // Factory creates either JNI or native transformer
+    RETURN_IF_ERROR(create_tvf_format_transformer(_tvf_sink, _state,
+                                                  use_jni ? nullptr : 
_file_writer_impl.get(),
                                                   _vec_output_expr_ctxs, 
&_vfile_writer));
 
     VLOG_DEBUG << "TVF table writer created file: " << file_name
-               << ", format=" << _tvf_sink.file_format
+               << ", format=" << _tvf_sink.file_format << ", use_jni=" << 
use_jni
                << ", query_id=" << print_id(_state->query_id());
 
     return _vfile_writer->open();
diff --git a/build.sh b/build.sh
index 580f5d3a047..2bb49e2673f 100755
--- a/build.sh
+++ b/build.sh
@@ -584,6 +584,7 @@ if [[ "${BUILD_BE_JAVA_EXTENSIONS}" -eq 1 ]]; then
     # modules+=("be-java-extensions/lakesoul-scanner")
     modules+=("be-java-extensions/preload-extensions")
     modules+=("be-java-extensions/${HADOOP_DEPS_NAME}")
+    modules+=("be-java-extensions/java-writer")
 
     # If the BE_EXTENSION_IGNORE variable is not empty, remove the modules 
that need to be ignored from FE_MODULES
     if [[ -n "${BE_EXTENSION_IGNORE}" ]]; then
@@ -920,6 +921,7 @@ EOF
     extensions_modules+=("preload-extensions")
     extensions_modules+=("iceberg-metadata-scanner")
     extensions_modules+=("${HADOOP_DEPS_NAME}")
+    extensions_modules+=("java-writer")
 
     if [[ -n "${BE_EXTENSION_IGNORE}" ]]; then
         IFS=',' read -r -a ignore_modules <<<"${BE_EXTENSION_IGNORE}"
@@ -1048,4 +1050,4 @@ if [[ -n "${DORIS_POST_BUILD_HOOK}" ]]; then
     eval "${DORIS_POST_BUILD_HOOK}"
 fi
 
-exit 0
\ No newline at end of file
+exit 0
diff --git 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniWriter.java
 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniWriter.java
new file mode 100644
index 00000000000..2dfefd2ac79
--- /dev/null
+++ 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniWriter.java
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.jni;
+
+import org.apache.doris.common.jni.vec.ColumnType;
+import org.apache.doris.common.jni.vec.VectorTable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * JniWriter is the base class for JNI-based writers, symmetric to JniScanner.
+ * Constructor signature: (int batchSize, Map<String, String> params) matches 
JniScanner
+ * to reuse the same class loading mechanism 
(Jni::Util::get_jni_scanner_class).
+ *
+ * Lifecycle: open() -> write() [repeated] -> close()
+ */
+public abstract class JniWriter {
+    protected int batchSize;
+    protected Map<String, String> params;
+    protected ColumnType[] columnTypes;
+    protected String[] fields;
+    protected long writeTime = 0;
+    protected long readTableTime = 0;
+
+    public JniWriter(int batchSize, Map<String, String> params) {
+        this.batchSize = batchSize;
+        this.params = params;
+    }
+
+    public abstract void open() throws IOException;
+
+    /**
+     * JNI entry point: receives C++ Block metadata, creates a ReadableTable,
+     * then delegates to writeInternal.
+     */
+    public void write(Map<String, String> inputParams) throws IOException {
+        // Parse and cache schema on first call
+        if (columnTypes == null) {
+            String requiredFields = inputParams.get("required_fields");
+            String columnsTypes = inputParams.get("columns_types");
+            if (requiredFields != null && !requiredFields.isEmpty()) {
+                fields = requiredFields.split(",");
+                String[] typeStrs = columnsTypes.split("#");
+                columnTypes = new ColumnType[typeStrs.length];
+                for (int i = 0; i < typeStrs.length; i++) {
+                    columnTypes[i] = ColumnType.parseType(fields[i], 
typeStrs[i]);
+                }
+            } else {
+                fields = new String[0];
+                columnTypes = new ColumnType[0];
+            }
+        }
+
+        long startRead = System.nanoTime();
+        VectorTable inputTable = VectorTable.createReadableTable(inputParams);
+        readTableTime += System.nanoTime() - startRead;
+
+        long startWrite = System.nanoTime();
+        writeInternal(inputTable);
+        writeTime += System.nanoTime() - startWrite;
+    }
+
+    protected abstract void writeInternal(VectorTable inputTable) throws 
IOException;
+
+    public abstract void close() throws IOException;
+
+    /**
+     * Performance metrics. Key format: "metricType:metricName"
+     * Supported types: timer, counter, bytes (same as JniScanner).
+     */
+    public Map<String, String> getStatistics() {
+        return Collections.emptyMap();
+    }
+
+    public long getWriteTime() {
+        return writeTime;
+    }
+
+    public long getReadTableTime() {
+        return readTableTime;
+    }
+}
diff --git a/fe/be-java-extensions/java-writer/pom.xml 
b/fe/be-java-extensions/java-writer/pom.xml
new file mode 100644
index 00000000000..e0bbaf20389
--- /dev/null
+++ b/fe/be-java-extensions/java-writer/pom.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>be-java-extensions</artifactId>
+        <groupId>org.apache.doris</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>java-writer</artifactId>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.doris</groupId>
+            <artifactId>java-common</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <finalName>java-writer</finalName>
+        <directory>${project.basedir}/target/</directory>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <descriptors>
+                        <descriptor>src/main/resources/package.xml</descriptor>
+                    </descriptors>
+                    <archive>
+                        <manifest>
+                            <mainClass></mainClass>
+                        </manifest>
+                    </archive>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/fe/be-java-extensions/java-writer/src/main/java/org/apache/doris/writer/LocalFileJniWriter.java
 
b/fe/be-java-extensions/java-writer/src/main/java/org/apache/doris/writer/LocalFileJniWriter.java
new file mode 100644
index 00000000000..0b0bab8aa5f
--- /dev/null
+++ 
b/fe/be-java-extensions/java-writer/src/main/java/org/apache/doris/writer/LocalFileJniWriter.java
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.writer;
+
+import org.apache.doris.common.jni.JniWriter;
+import org.apache.doris.common.jni.vec.VectorTable;
+
+import org.apache.log4j.Logger;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * LocalFileJniWriter writes C++ Block data to local CSV files via JNI.
+ * Loaded by C++ as: org/apache/doris/writer/LocalFileJniWriter
+ */
+public class LocalFileJniWriter extends JniWriter {
+    private static final Logger LOG = 
Logger.getLogger(LocalFileJniWriter.class);
+
+    private String filePath;
+    private String columnSeparator;
+    private String lineDelimiter;
+    private BufferedWriter fileWriter;
+    private long writtenRows = 0;
+    private long writtenBytes = 0;
+
+    public LocalFileJniWriter(int batchSize, Map<String, String> params) {
+        super(batchSize, params);
+        this.filePath = params.get("file_path");
+        this.columnSeparator = params.getOrDefault("column_separator", ",");
+        this.lineDelimiter = params.getOrDefault("line_delimiter", "\n");
+        LOG.info("LocalFileJniWriter created: filePath=" + filePath
+                + ", columnSeparator=" + columnSeparator
+                + ", batchSize=" + batchSize);
+    }
+
+    @Override
+    public void open() throws IOException {
+        LOG.info("LocalFileJniWriter opening file: " + filePath);
+        fileWriter = new BufferedWriter(new FileWriter(filePath));
+        LOG.info("LocalFileJniWriter opened file successfully: " + filePath);
+    }
+
+    @Override
+    protected void writeInternal(VectorTable inputTable) throws IOException {
+        int numRows = inputTable.getNumRows();
+        int numCols = inputTable.getNumColumns();
+        LOG.info("LocalFileJniWriter writeInternal: numRows=" + numRows + ", 
numCols=" + numCols);
+        if (numRows == 0) {
+            return;
+        }
+
+        Object[][] data = inputTable.getMaterializedData();
+        StringBuilder sb = new StringBuilder();
+
+        for (int row = 0; row < numRows; row++) {
+            for (int col = 0; col < numCols; col++) {
+                if (col > 0) {
+                    sb.append(columnSeparator);
+                }
+                Object val = data[col][row];
+                if (val != null) {
+                    sb.append(val.toString());
+                } else {
+                    sb.append("\\N");
+                }
+            }
+            sb.append(lineDelimiter);
+        }
+
+        String output = sb.toString();
+        fileWriter.write(output);
+        writtenRows += numRows;
+        writtenBytes += output.getBytes().length;
+        LOG.info("LocalFileJniWriter wrote " + numRows + " rows, 
totalWrittenRows=" + writtenRows
+                + ", totalWrittenBytes=" + writtenBytes);
+    }
+
+    @Override
+    public void close() throws IOException {
+        LOG.info("LocalFileJniWriter closing: filePath=" + filePath
+                + ", totalWrittenRows=" + writtenRows + ", totalWrittenBytes=" 
+ writtenBytes);
+        if (fileWriter != null) {
+            fileWriter.flush();
+            fileWriter.close();
+            fileWriter = null;
+        }
+        LOG.info("LocalFileJniWriter closed successfully: " + filePath);
+    }
+
+    @Override
+    public Map<String, String> getStatistics() {
+        Map<String, String> stats = new java.util.HashMap<>();
+        stats.put("counter:WrittenRows", String.valueOf(writtenRows));
+        stats.put("bytes:WrittenBytes", String.valueOf(writtenBytes));
+        stats.put("timer:WriteTime", String.valueOf(writeTime));
+        stats.put("timer:ReadTableTime", String.valueOf(readTableTime));
+        return stats;
+    }
+}
diff --git a/fe/be-java-extensions/java-writer/src/main/resources/package.xml 
b/fe/be-java-extensions/java-writer/src/main/resources/package.xml
new file mode 100644
index 00000000000..4bbb2610603
--- /dev/null
+++ b/fe/be-java-extensions/java-writer/src/main/resources/package.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0";
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+          xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 
http://maven.apache.org/xsd/assembly-2.0.0.xsd";>
+    <id>jar-with-dependencies</id>
+    <formats>
+        <format>jar</format>
+    </formats>
+    <includeBaseDirectory>false</includeBaseDirectory>
+    <dependencySets>
+        <dependencySet>
+            <outputDirectory>/</outputDirectory>
+            <useProjectArtifact>true</useProjectArtifact>
+            <unpack>true</unpack>
+            <scope>runtime</scope>
+            <unpackOptions>
+                <excludes>
+                    <exclude>**/Log4j2Plugins.dat</exclude>
+                </excludes>
+            </unpackOptions>
+        </dependencySet>
+    </dependencySets>
+</assembly>
diff --git a/fe/be-java-extensions/pom.xml b/fe/be-java-extensions/pom.xml
index 8151cd179d1..eab071c9c5a 100644
--- a/fe/be-java-extensions/pom.xml
+++ b/fe/be-java-extensions/pom.xml
@@ -34,6 +34,7 @@ under the License.
         <module>preload-extensions</module>
         <module>trino-connector-scanner</module>
         <module>hadoop-deps</module>
+        <module>java-writer</module>
     </modules>
 
     <parent>
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/TVFTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/TVFTableSink.java
index 1d6d44d52ae..c511336767c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/TVFTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/TVFTableSink.java
@@ -31,6 +31,7 @@ import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileType;
 import org.apache.doris.thrift.TTVFTableSink;
+import org.apache.doris.thrift.TTVFWriterType;
 
 import com.google.common.collect.Maps;
 
@@ -152,6 +153,15 @@ public class TVFTableSink extends DataSink {
             tSink.setHadoopConfig(backendConnectProps);
         }
 
+        // Set writer_type: JNI if writer_class is specified, otherwise NATIVE
+        String writerClass = properties.get("writer_class");
+        if (writerClass != null) {
+            tSink.setWriterType(TTVFWriterType.JNI);
+            tSink.setWriterClass(writerClass);
+        } else {
+            tSink.setWriterType(TTVFWriterType.NATIVE);
+        }
+
         tDataSink = new TDataSink(TDataSinkType.TVF_TABLE_SINK);
         tDataSink.setTvfTableSink(tSink);
     }
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index c5e176ee155..cb817b7dbe8 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -453,6 +453,11 @@ struct TDictionarySink {
 struct TBlackholeSink {
 }
 
+enum TTVFWriterType {
+    NATIVE = 0,
+    JNI = 1
+}
+
 struct TTVFTableSink {
     1: optional string tvf_name              // "local", "s3", "hdfs"
     2: optional string file_path
@@ -467,6 +472,8 @@ struct TTVFTableSink {
     11: optional map<string, string> hadoop_config
     12: optional PlanNodes.TFileCompressType compression_type
     13: optional i64 backend_id              // local TVF: specify BE
+    14: optional TTVFWriterType writer_type   // NATIVE or JNI
+    15: optional string writer_class          // Java class name (required 
when writer_type=JNI)
 }
 
 struct TDataSink {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to