This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 116fffc2969 [feat](jni-writer) Support JNI-based write framework for
custom writers (#60756)
116fffc2969 is described below
commit 116fffc29699f274375086412764077788c2974f
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Sun Feb 15 00:28:23 2026 +0800
[feat](jni-writer) Support JNI-based write framework for custom writers
(#60756)
### What problem does this PR solve?
Related #60768
Introduce a symmetric JNI writer mechanism that complements the existing
JNI scanner framework, enabling custom file writers to be implemented in
Java and invoked through JNI from the C++ query executor.
Key changes:
- Add VJniFormatTransformer for delegating writes to Java via JNI
- Extend TTVFTableSink with writer_type and writer_class fields
- Add JniWriter base class and LocalFileJniWriter reference
implementation
- Integrate with file_format_transformer factory routing logic
The implementation follows the same patterns as JniScanner for
consistency:
- Reuses class loading mechanism (Jni::Util::get_jni_scanner_class)
- Mirrors constructor signature: (int batchSize, Map<String,String>
params)
- Supports schema caching and performance instrumentation
---
.../runtime/vfile_format_transformer_factory.cpp | 23 ++++
be/src/vec/runtime/vjni_format_transformer.cpp | 136 +++++++++++++++++++++
be/src/vec/runtime/vjni_format_transformer.h | 74 +++++++++++
be/src/vec/sink/writer/vtvf_table_writer.cpp | 27 ++--
build.sh | 4 +-
.../org/apache/doris/common/jni/JniWriter.java | 99 +++++++++++++++
fe/be-java-extensions/java-writer/pom.xml | 75 ++++++++++++
.../apache/doris/writer/LocalFileJniWriter.java | 117 ++++++++++++++++++
.../java-writer/src/main/resources/package.xml | 41 +++++++
fe/be-java-extensions/pom.xml | 1 +
.../org/apache/doris/planner/TVFTableSink.java | 10 ++
gensrc/thrift/DataSinks.thrift | 7 ++
12 files changed, 603 insertions(+), 11 deletions(-)
diff --git a/be/src/vec/runtime/vfile_format_transformer_factory.cpp
b/be/src/vec/runtime/vfile_format_transformer_factory.cpp
index bbd415e2452..69751cc637f 100644
--- a/be/src/vec/runtime/vfile_format_transformer_factory.cpp
+++ b/be/src/vec/runtime/vfile_format_transformer_factory.cpp
@@ -23,6 +23,7 @@
#include <vector>
#include "vec/runtime/vcsv_transformer.h"
+#include "vec/runtime/vjni_format_transformer.h"
#include "vec/runtime/vorc_transformer.h"
#include "vec/runtime/vparquet_transformer.h"
@@ -32,6 +33,28 @@ Status create_tvf_format_transformer(const TTVFTableSink&
tvf_sink, RuntimeState
io::FileWriter* file_writer,
const VExprContextSPtrs&
output_vexpr_ctxs,
std::unique_ptr<VFileFormatTransformer>*
result) {
+ // JNI writer path: delegate to Java-side writer
+ if (tvf_sink.__isset.writer_type && tvf_sink.writer_type ==
TTVFWriterType::JNI) {
+ if (!tvf_sink.__isset.writer_class) {
+ return Status::InternalError("writer_class is required when
writer_type is JNI");
+ }
+ std::map<std::string, std::string> writer_params;
+ if (tvf_sink.__isset.properties) {
+ writer_params = tvf_sink.properties;
+ }
+ writer_params["file_path"] = tvf_sink.file_path;
+ if (tvf_sink.__isset.column_separator) {
+ writer_params["column_separator"] = tvf_sink.column_separator;
+ }
+ if (tvf_sink.__isset.line_delimiter) {
+ writer_params["line_delimiter"] = tvf_sink.line_delimiter;
+ }
+ result->reset(new VJniFormatTransformer(state, output_vexpr_ctxs,
tvf_sink.writer_class,
+ std::move(writer_params)));
+ return Status::OK();
+ }
+
+ // Native writer path
TFileFormatType::type format = tvf_sink.file_format;
switch (format) {
case TFileFormatType::FORMAT_CSV_PLAIN: {
diff --git a/be/src/vec/runtime/vjni_format_transformer.cpp
b/be/src/vec/runtime/vjni_format_transformer.cpp
new file mode 100644
index 00000000000..ef7ed6048e3
--- /dev/null
+++ b/be/src/vec/runtime/vjni_format_transformer.cpp
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/runtime/vjni_format_transformer.h"
+
+#include "runtime/runtime_state.h"
+#include "vec/exec/jni_connector.h"
+
+namespace doris::vectorized {
+
+VJniFormatTransformer::VJniFormatTransformer(RuntimeState* state,
+ const VExprContextSPtrs&
output_vexpr_ctxs,
+ std::string writer_class,
+ std::map<std::string,
std::string> writer_params)
+ : VFileFormatTransformer(state, output_vexpr_ctxs, false),
+ _writer_class(std::move(writer_class)),
+ _writer_params(std::move(writer_params)) {}
+
+Status VJniFormatTransformer::_init_jni_writer(JNIEnv* env, int batch_size) {
+ // Load writer class via the same class loader as JniScanner
+ Jni::GlobalClass jni_writer_cls;
+ RETURN_IF_ERROR(Jni::Util::get_jni_scanner_class(env,
_writer_class.c_str(), &jni_writer_cls));
+
+ // Get constructor: (int batchSize, Map<String,String> params)
+ Jni::MethodId writer_constructor;
+ RETURN_IF_ERROR(
+ jni_writer_cls.get_method(env, "<init>", "(ILjava/util/Map;)V",
&writer_constructor));
+
+ // Convert C++ params map to Java HashMap
+ Jni::LocalObject hashmap_object;
+ RETURN_IF_ERROR(Jni::Util::convert_to_java_map(env, _writer_params,
&hashmap_object));
+
+ // Create writer instance
+ RETURN_IF_ERROR(jni_writer_cls.new_object(env, writer_constructor)
+ .with_arg((jint)batch_size)
+ .with_arg(hashmap_object)
+ .call(&_jni_writer_obj));
+
+ // Resolve method IDs
+ RETURN_IF_ERROR(jni_writer_cls.get_method(env, "open", "()V",
&_jni_writer_open));
+ RETURN_IF_ERROR(
+ jni_writer_cls.get_method(env, "write", "(Ljava/util/Map;)V",
&_jni_writer_write));
+ RETURN_IF_ERROR(jni_writer_cls.get_method(env, "close", "()V",
&_jni_writer_close));
+ RETURN_IF_ERROR(jni_writer_cls.get_method(env, "getStatistics",
"()Ljava/util/Map;",
+ &_jni_writer_get_statistics));
+ return Status::OK();
+}
+
+Status VJniFormatTransformer::open() {
+ JNIEnv* env = nullptr;
+ RETURN_IF_ERROR(Jni::Env::Get(&env));
+
+ int batch_size = _state->batch_size();
+ RETURN_IF_ERROR(_init_jni_writer(env, batch_size));
+
+ RETURN_IF_ERROR(_jni_writer_obj.call_void_method(env,
_jni_writer_open).call());
+ RETURN_ERROR_IF_EXC(env);
+
+ _opened = true;
+ return Status::OK();
+}
+
+Status VJniFormatTransformer::write(const Block& block) {
+ if (block.rows() == 0) {
+ return Status::OK();
+ }
+
+ JNIEnv* env = nullptr;
+ RETURN_IF_ERROR(Jni::Env::Get(&env));
+
+ // 1. Convert Block to Java table metadata (column addresses)
+ Block* mutable_block = const_cast<Block*>(&block);
+ std::unique_ptr<long[]> input_table;
+ RETURN_IF_ERROR(JniConnector::to_java_table(mutable_block, input_table));
+
+ // 2. Cache schema on first call
+ if (!_schema_cached) {
+ auto schema = JniConnector::parse_table_schema(mutable_block);
+ _cached_required_fields = schema.first;
+ _cached_columns_types = schema.second;
+ _schema_cached = true;
+ }
+
+ // 3. Build input params map for Java writer
+ std::map<std::string, std::string> input_params = {
+ {"meta_address", std::to_string((long)input_table.get())},
+ {"required_fields", _cached_required_fields},
+ {"columns_types", _cached_columns_types}};
+
+ // 4. Convert to Java Map and call writer.write(inputParams)
+ Jni::LocalObject input_map;
+ RETURN_IF_ERROR(Jni::Util::convert_to_java_map(env, input_params,
&input_map));
+
+ RETURN_IF_ERROR(
+ _jni_writer_obj.call_void_method(env,
_jni_writer_write).with_arg(input_map).call());
+ RETURN_ERROR_IF_EXC(env);
+
+ _cur_written_rows += block.rows();
+ return Status::OK();
+}
+
+Status VJniFormatTransformer::close() {
+ if (_closed || !_opened) {
+ return Status::OK();
+ }
+ _closed = true;
+
+ JNIEnv* env = nullptr;
+ RETURN_IF_ERROR(Jni::Env::Get(&env));
+
+ RETURN_IF_ERROR(_jni_writer_obj.call_void_method(env,
_jni_writer_close).call());
+ RETURN_ERROR_IF_EXC(env);
+
+ return Status::OK();
+}
+
+int64_t VJniFormatTransformer::written_len() {
+ // JNI writer manages file size on Java side; return 0 to disable C++
auto-split.
+ return 0;
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/runtime/vjni_format_transformer.h
b/be/src/vec/runtime/vjni_format_transformer.h
new file mode 100644
index 00000000000..aa63d65e516
--- /dev/null
+++ b/be/src/vec/runtime/vjni_format_transformer.h
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <map>
+#include <string>
+
+#include "util/jni-util.h"
+#include "vfile_format_transformer.h"
+
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+
+/**
+ * VJniFormatTransformer is a VFileFormatTransformer implementation that
delegates
+ * write operations to a Java-side JniWriter via JNI. It sits alongside
+ * VCSVTransformer/VParquetTransformer/VOrcTransformer as a peer
implementation.
+ *
+ * The Java writer class must extend org.apache.doris.common.jni.JniWriter and
+ * follow the same constructor signature as JniScanner: (int batchSize,
Map<String,String> params).
+ */
+class VJniFormatTransformer final : public VFileFormatTransformer {
+public:
+ VJniFormatTransformer(RuntimeState* state, const VExprContextSPtrs&
output_vexpr_ctxs,
+ std::string writer_class,
+ std::map<std::string, std::string> writer_params);
+
+ ~VJniFormatTransformer() override = default;
+
+ Status open() override;
+ Status write(const Block& block) override;
+ Status close() override;
+ int64_t written_len() override;
+
+private:
+ Status _init_jni_writer(JNIEnv* env, int batch_size);
+
+ std::string _writer_class;
+ std::map<std::string, std::string> _writer_params;
+
+ // JNI handles (same pattern as JniConnector)
+ Jni::GlobalObject _jni_writer_obj;
+ Jni::MethodId _jni_writer_open;
+ Jni::MethodId _jni_writer_write;
+ Jni::MethodId _jni_writer_close;
+ Jni::MethodId _jni_writer_get_statistics;
+
+ // Schema cache (computed on first write, reused afterwards)
+ bool _schema_cached = false;
+ std::string _cached_required_fields;
+ std::string _cached_columns_types;
+
+ bool _opened = false;
+ bool _closed = false;
+};
+
+} // namespace doris::vectorized
+
+#include "common/compile_check_end.h"
diff --git a/be/src/vec/sink/writer/vtvf_table_writer.cpp
b/be/src/vec/sink/writer/vtvf_table_writer.cpp
index 7d82e8ef471..63dc3850c97 100644
--- a/be/src/vec/sink/writer/vtvf_table_writer.cpp
+++ b/be/src/vec/sink/writer/vtvf_table_writer.cpp
@@ -87,21 +87,28 @@ Status VTVFTableWriter::close(Status status) {
}
Status VTVFTableWriter::_create_file_writer(const std::string& file_name) {
- TFileType::type file_type = _tvf_sink.file_type;
- std::map<std::string, std::string> properties;
- if (_tvf_sink.__isset.properties) {
- properties = _tvf_sink.properties;
+ bool use_jni = _tvf_sink.__isset.writer_type && _tvf_sink.writer_type ==
TTVFWriterType::JNI;
+
+ if (!use_jni) {
+ // Native path: create file writer via FileFactory
+ TFileType::type file_type = _tvf_sink.file_type;
+ std::map<std::string, std::string> properties;
+ if (_tvf_sink.__isset.properties) {
+ properties = _tvf_sink.properties;
+ }
+
+ _file_writer_impl = DORIS_TRY(FileFactory::create_file_writer(
+ file_type, _state->exec_env(), {}, properties, file_name,
+ {.write_file_cache = false, .sync_file_data = false}));
}
- _file_writer_impl = DORIS_TRY(FileFactory::create_file_writer(
- file_type, _state->exec_env(), {}, properties, file_name,
- {.write_file_cache = false, .sync_file_data = false}));
-
- RETURN_IF_ERROR(create_tvf_format_transformer(_tvf_sink, _state,
_file_writer_impl.get(),
+ // Factory creates either JNI or native transformer
+ RETURN_IF_ERROR(create_tvf_format_transformer(_tvf_sink, _state,
+ use_jni ? nullptr :
_file_writer_impl.get(),
_vec_output_expr_ctxs,
&_vfile_writer));
VLOG_DEBUG << "TVF table writer created file: " << file_name
- << ", format=" << _tvf_sink.file_format
+ << ", format=" << _tvf_sink.file_format << ", use_jni=" <<
use_jni
<< ", query_id=" << print_id(_state->query_id());
return _vfile_writer->open();
diff --git a/build.sh b/build.sh
index 580f5d3a047..2bb49e2673f 100755
--- a/build.sh
+++ b/build.sh
@@ -584,6 +584,7 @@ if [[ "${BUILD_BE_JAVA_EXTENSIONS}" -eq 1 ]]; then
# modules+=("be-java-extensions/lakesoul-scanner")
modules+=("be-java-extensions/preload-extensions")
modules+=("be-java-extensions/${HADOOP_DEPS_NAME}")
+ modules+=("be-java-extensions/java-writer")
# If the BE_EXTENSION_IGNORE variable is not empty, remove the modules
that need to be ignored from FE_MODULES
if [[ -n "${BE_EXTENSION_IGNORE}" ]]; then
@@ -920,6 +921,7 @@ EOF
extensions_modules+=("preload-extensions")
extensions_modules+=("iceberg-metadata-scanner")
extensions_modules+=("${HADOOP_DEPS_NAME}")
+ extensions_modules+=("java-writer")
if [[ -n "${BE_EXTENSION_IGNORE}" ]]; then
IFS=',' read -r -a ignore_modules <<<"${BE_EXTENSION_IGNORE}"
@@ -1048,4 +1050,4 @@ if [[ -n "${DORIS_POST_BUILD_HOOK}" ]]; then
eval "${DORIS_POST_BUILD_HOOK}"
fi
-exit 0
\ No newline at end of file
+exit 0
diff --git
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniWriter.java
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniWriter.java
new file mode 100644
index 00000000000..2dfefd2ac79
--- /dev/null
+++
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniWriter.java
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.jni;
+
+import org.apache.doris.common.jni.vec.ColumnType;
+import org.apache.doris.common.jni.vec.VectorTable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * JniWriter is the base class for JNI-based writers, symmetric to JniScanner.
+ * Constructor signature: (int batchSize, Map<String, String> params) matches
JniScanner
+ * to reuse the same class loading mechanism
(Jni::Util::get_jni_scanner_class).
+ *
+ * Lifecycle: open() -> write() [repeated] -> close()
+ */
+public abstract class JniWriter {
+ protected int batchSize;
+ protected Map<String, String> params;
+ protected ColumnType[] columnTypes;
+ protected String[] fields;
+ protected long writeTime = 0;
+ protected long readTableTime = 0;
+
+ public JniWriter(int batchSize, Map<String, String> params) {
+ this.batchSize = batchSize;
+ this.params = params;
+ }
+
+ public abstract void open() throws IOException;
+
+ /**
+ * JNI entry point: receives C++ Block metadata, creates a ReadableTable,
+ * then delegates to writeInternal.
+ */
+ public void write(Map<String, String> inputParams) throws IOException {
+ // Parse and cache schema on first call
+ if (columnTypes == null) {
+ String requiredFields = inputParams.get("required_fields");
+ String columnsTypes = inputParams.get("columns_types");
+ if (requiredFields != null && !requiredFields.isEmpty()) {
+ fields = requiredFields.split(",");
+ String[] typeStrs = columnsTypes.split("#");
+ columnTypes = new ColumnType[typeStrs.length];
+ for (int i = 0; i < typeStrs.length; i++) {
+ columnTypes[i] = ColumnType.parseType(fields[i],
typeStrs[i]);
+ }
+ } else {
+ fields = new String[0];
+ columnTypes = new ColumnType[0];
+ }
+ }
+
+ long startRead = System.nanoTime();
+ VectorTable inputTable = VectorTable.createReadableTable(inputParams);
+ readTableTime += System.nanoTime() - startRead;
+
+ long startWrite = System.nanoTime();
+ writeInternal(inputTable);
+ writeTime += System.nanoTime() - startWrite;
+ }
+
+ protected abstract void writeInternal(VectorTable inputTable) throws
IOException;
+
+ public abstract void close() throws IOException;
+
+ /**
+ * Performance metrics. Key format: "metricType:metricName"
+ * Supported types: timer, counter, bytes (same as JniScanner).
+ */
+ public Map<String, String> getStatistics() {
+ return Collections.emptyMap();
+ }
+
+ public long getWriteTime() {
+ return writeTime;
+ }
+
+ public long getReadTableTime() {
+ return readTableTime;
+ }
+}
diff --git a/fe/be-java-extensions/java-writer/pom.xml
b/fe/be-java-extensions/java-writer/pom.xml
new file mode 100644
index 00000000000..e0bbaf20389
--- /dev/null
+++ b/fe/be-java-extensions/java-writer/pom.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>be-java-extensions</artifactId>
+ <groupId>org.apache.doris</groupId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>java-writer</artifactId>
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.doris</groupId>
+ <artifactId>java-common</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <finalName>java-writer</finalName>
+ <directory>${project.basedir}/target/</directory>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/resources/package.xml</descriptor>
+ </descriptors>
+ <archive>
+ <manifest>
+ <mainClass></mainClass>
+ </manifest>
+ </archive>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/fe/be-java-extensions/java-writer/src/main/java/org/apache/doris/writer/LocalFileJniWriter.java
b/fe/be-java-extensions/java-writer/src/main/java/org/apache/doris/writer/LocalFileJniWriter.java
new file mode 100644
index 00000000000..0b0bab8aa5f
--- /dev/null
+++
b/fe/be-java-extensions/java-writer/src/main/java/org/apache/doris/writer/LocalFileJniWriter.java
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.writer;
+
+import org.apache.doris.common.jni.JniWriter;
+import org.apache.doris.common.jni.vec.VectorTable;
+
+import org.apache.log4j.Logger;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * LocalFileJniWriter writes C++ Block data to local CSV files via JNI.
+ * Loaded by C++ as: org/apache/doris/writer/LocalFileJniWriter
+ */
+public class LocalFileJniWriter extends JniWriter {
+ private static final Logger LOG =
Logger.getLogger(LocalFileJniWriter.class);
+
+ private String filePath;
+ private String columnSeparator;
+ private String lineDelimiter;
+ private BufferedWriter fileWriter;
+ private long writtenRows = 0;
+ private long writtenBytes = 0;
+
+ public LocalFileJniWriter(int batchSize, Map<String, String> params) {
+ super(batchSize, params);
+ this.filePath = params.get("file_path");
+ this.columnSeparator = params.getOrDefault("column_separator", ",");
+ this.lineDelimiter = params.getOrDefault("line_delimiter", "\n");
+ LOG.info("LocalFileJniWriter created: filePath=" + filePath
+ + ", columnSeparator=" + columnSeparator
+ + ", batchSize=" + batchSize);
+ }
+
+ @Override
+ public void open() throws IOException {
+ LOG.info("LocalFileJniWriter opening file: " + filePath);
+ fileWriter = new BufferedWriter(new FileWriter(filePath));
+ LOG.info("LocalFileJniWriter opened file successfully: " + filePath);
+ }
+
+ @Override
+ protected void writeInternal(VectorTable inputTable) throws IOException {
+ int numRows = inputTable.getNumRows();
+ int numCols = inputTable.getNumColumns();
+ LOG.info("LocalFileJniWriter writeInternal: numRows=" + numRows + ",
numCols=" + numCols);
+ if (numRows == 0) {
+ return;
+ }
+
+ Object[][] data = inputTable.getMaterializedData();
+ StringBuilder sb = new StringBuilder();
+
+ for (int row = 0; row < numRows; row++) {
+ for (int col = 0; col < numCols; col++) {
+ if (col > 0) {
+ sb.append(columnSeparator);
+ }
+ Object val = data[col][row];
+ if (val != null) {
+ sb.append(val.toString());
+ } else {
+ sb.append("\\N");
+ }
+ }
+ sb.append(lineDelimiter);
+ }
+
+ String output = sb.toString();
+ fileWriter.write(output);
+ writtenRows += numRows;
+ writtenBytes += output.getBytes().length;
+ LOG.info("LocalFileJniWriter wrote " + numRows + " rows,
totalWrittenRows=" + writtenRows
+ + ", totalWrittenBytes=" + writtenBytes);
+ }
+
+ @Override
+ public void close() throws IOException {
+ LOG.info("LocalFileJniWriter closing: filePath=" + filePath
+ + ", totalWrittenRows=" + writtenRows + ", totalWrittenBytes="
+ writtenBytes);
+ if (fileWriter != null) {
+ fileWriter.flush();
+ fileWriter.close();
+ fileWriter = null;
+ }
+ LOG.info("LocalFileJniWriter closed successfully: " + filePath);
+ }
+
+ @Override
+ public Map<String, String> getStatistics() {
+ Map<String, String> stats = new java.util.HashMap<>();
+ stats.put("counter:WrittenRows", String.valueOf(writtenRows));
+ stats.put("bytes:WrittenBytes", String.valueOf(writtenBytes));
+ stats.put("timer:WriteTime", String.valueOf(writeTime));
+ stats.put("timer:ReadTableTime", String.valueOf(readTableTime));
+ return stats;
+ }
+}
diff --git a/fe/be-java-extensions/java-writer/src/main/resources/package.xml
b/fe/be-java-extensions/java-writer/src/main/resources/package.xml
new file mode 100644
index 00000000000..4bbb2610603
--- /dev/null
+++ b/fe/be-java-extensions/java-writer/src/main/resources/package.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0
http://maven.apache.org/xsd/assembly-2.0.0.xsd">
+ <id>jar-with-dependencies</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory>/</outputDirectory>
+ <useProjectArtifact>true</useProjectArtifact>
+ <unpack>true</unpack>
+ <scope>runtime</scope>
+ <unpackOptions>
+ <excludes>
+ <exclude>**/Log4j2Plugins.dat</exclude>
+ </excludes>
+ </unpackOptions>
+ </dependencySet>
+ </dependencySets>
+</assembly>
diff --git a/fe/be-java-extensions/pom.xml b/fe/be-java-extensions/pom.xml
index 8151cd179d1..eab071c9c5a 100644
--- a/fe/be-java-extensions/pom.xml
+++ b/fe/be-java-extensions/pom.xml
@@ -34,6 +34,7 @@ under the License.
<module>preload-extensions</module>
<module>trino-connector-scanner</module>
<module>hadoop-deps</module>
+ <module>java-writer</module>
</modules>
<parent>
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/TVFTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/TVFTableSink.java
index 1d6d44d52ae..c511336767c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/TVFTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/TVFTableSink.java
@@ -31,6 +31,7 @@ import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TTVFTableSink;
+import org.apache.doris.thrift.TTVFWriterType;
import com.google.common.collect.Maps;
@@ -152,6 +153,15 @@ public class TVFTableSink extends DataSink {
tSink.setHadoopConfig(backendConnectProps);
}
+ // Set writer_type: JNI if writer_class is specified, otherwise NATIVE
+ String writerClass = properties.get("writer_class");
+ if (writerClass != null) {
+ tSink.setWriterType(TTVFWriterType.JNI);
+ tSink.setWriterClass(writerClass);
+ } else {
+ tSink.setWriterType(TTVFWriterType.NATIVE);
+ }
+
tDataSink = new TDataSink(TDataSinkType.TVF_TABLE_SINK);
tDataSink.setTvfTableSink(tSink);
}
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index c5e176ee155..cb817b7dbe8 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -453,6 +453,11 @@ struct TDictionarySink {
struct TBlackholeSink {
}
+enum TTVFWriterType {
+ NATIVE = 0,
+ JNI = 1
+}
+
struct TTVFTableSink {
1: optional string tvf_name // "local", "s3", "hdfs"
2: optional string file_path
@@ -467,6 +472,8 @@ struct TTVFTableSink {
11: optional map<string, string> hadoop_config
12: optional PlanNodes.TFileCompressType compression_type
13: optional i64 backend_id // local TVF: specify BE
+ 14: optional TTVFWriterType writer_type // NATIVE or JNI
+ 15: optional string writer_class // Java class name (required
when writer_type=JNI)
}
struct TDataSink {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]